Using Fluent Bit to send CommonSecurityLog data to Sentinel
One of the limitations of Microsoft Sentinel and Azure Log Analytics was that if you wanted to use a custom log forwarder or log source, you couldn't write to the built-in tables. You had to create Custom tables, which had a _CL
suffix.
This was a problem, because each custom table you added had to be manually added to your Analytic rules and Workbooks, creating stupid inefficiencies.
But this is no more! With the new Logs Ingestion API, Microsoft supports custom data being sent to 4 built-in tables: CommonSecurityLog
, SecurityEvents
, Syslog
, WindowsEvents
.
In this example, we're going to be sending firewall log data from OpenWrt to the standard CommonSecurityLog
table, so that we get the benefit of having normalised data, and the built-in Analytic templates.
To set this up, we're going to:
- Identify the data we want to collect and mapping it to CommonSecurityLog
- Create an App Registration
- Create a Data Collection Endpoint (DCE)
- Create a Data Collection Rule (DCR)
- Assign IAM Roles to the DCR
- Install Fluent Bit
- Configure a Fluent Bit parser for OpenWrt firewall logs
- Create a Fluent Bit config
Mapping iptables to CommonSecurityLog
The first step in ingesting any data into any SIEM is to know what the data is, what you're interested in, and how to normalise it so that you can make use of it. Otherwise, you're just sending garbage into a very expensive garbage pile.
Example OpenWrt iptables logs look like:
[000000.000000] drop wan invalid ct state: IN= OUT=pppoe-wan SRC=203.0.113.42 DST=198.51.100.9 LEN=40 TOS=0x00 PREC=0x00 TTL=64 ID=0 DF PROTO=TCP SPT=52182 DPT=443 WINDOW=0 RES=0x00 RST URGP=0
[000000.000000] drop wan invalid ct state: IN=eth1 OUT=pppoe-wan MAC=aa:aa:aa:aa:aa:aa:bb:bb:bb:bb:bb:bb:08:00 SRC=198.51.100.134 DST=198.51.100.29 LEN=40 TOS=0x00 PREC=0x00 TTL=127 ID=45489 DF PROTO=TCP SPT=56913 DPT=443 WINDOW=0 RES=0x00 ACK RST URGP=0
[000000.000000] reject wan forward: IN=pppoe-wan OUT=eth1 MAC= SRC=2001:0db8:0000:0000:0000:0000:ffff:0002 DST=2001:0db8:aaaa:aaaa:aaaa:aaaa:aaaa:af3c LEN=60 TC=0 HOPLIMIT=57 FLOWLBL=864951 PROTO=TCP SPT=443 DPT=53714 WINDOW=0 RES=0x00 RST URGP=0
[000000.000000] reject wan in: IN=pppoe-wan OUT= MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=113 ID=19525 DF PROTO=TCP SPT=53501 DPT=1433 WINDOW=8192 RES=0x00 SYN URGP=0
[000000.000000] reject wan in: IN=pppoe-wan OUT= MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=12 ID=31743 PROTO=UDP SPT=12054 DPT=53 LEN=32
[000000.000000] reject wan out: IN=eth1 OUT=pppoe-wan MAC=aa:aa:aa:aa:aa:aa:bb:bb:bb:bb:bb:bb:08:00 SRC=198.51.100.233 DST=198.51.100.1 LEN=84 TOS=0x00 PREC=0x00 TTL=63 ID=15546 PROTO=ICMP TYPE=8 CODE=0 ID=2828 SEQ=0
[000000.000000] accept wan out: IN=eth1.30 OUT=pppoe-wan MAC= SRC=198.51.100.130 DST=203.0.113.42 LEN=52 TOS=0x00 PREC=0x00 TTL=12 ID=31893 PROTO=UDP SPT=24912 DPT=53 LEN=32
Based on these logs, and what I'm interested in, I'm going to normalise the data to the CommonSecurityLog schema using the following fields:
CommonSecurityLog | datatype | iptables | fluent-bit |
---|---|---|---|
TimeGenerated | datetime | time | |
CommunicationDirection | (in or out) | ||
Computer | host | ||
DestinationIP | DST= | ||
DestinationMACAddress | MAC= | ||
DestinationPort | int | DPT= | |
DeviceAction | (accept, reject, or drop) | ||
DeviceCustomString1 | (rule interface) | ||
DeviceInboundInterface | IN= | ||
DeviceOutboundInterface | OUT= | ||
ProcessName | ident | ||
Protocol | PROTO= | ||
ReceiptTime | time | ||
ReceivedBytes | long | LEN= | |
SourceIP | SRC= | ||
SourceMACAddress | MAC= | ||
SourcePort | int | SPT= | |
Message | message |
Note: sharp readers will notice I'm keeping the original message field, which effectively duplicates all the relevant data... this is just temporary, as I haven't fully finished my regex filter. At some point in the future, I'll remove the raw message, otherwise I'm just doubling my data costs.
Create an App Registration
Unlike the old Data Collector API, which used a static key for the entire workspace, the new Log Ingestion API uses an OAuth flow, so that you can be quite granular about what sources are allowed to send what data to what tables.
This means that we'll need to create an App Registration for Fluent Bit.
- Go to https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/CreateApplicationBlade/quickStartType~/null/isMSAApp~/false
- Enter a name like
fluent
, leave the Redirect URI blank, and click Register. - Once created, on the Overview page, make a note of the Application (client) ID and Directory (tenant) ID.
- Go to Owners, and set yourself as an Owner. This is good operational practice (so that in large enterprises, there's some clue about who manages what), and it means that you'll always be able to create a new Secret, even if you lose your admin rights.
- Go to Certificates & secrets, and create a new client Secret. Don't forget to add a calendar reminder for the Expiry date, and make a note of the new secret.
Create a Data Collection Endpoint (DCE)
We're also going to need a Data Collection Endpoint (DCE). The DCE is the public URL that will accept data. We can reuse an existing DCE, provided that:
- it's in the same Location (region) as the destination Log Analytics workspace.
- your log source can talk to it, either over the public internet, or a Private Link scope.
To create one,
- Go to https://portal.azure.com/#view/Microsoft_Azure_Monitoring/CreateDataCollectionEndpointViewModel/_provisioningContext~/
- Select the Resource Group of your Sentinel / Log Analytics workspace
- Make sure the Location is also the same!
- And hit create
Create a DCR to send data to CommonSecurityLog
The Data Collection Rule (DCR) is used to glue together the ingest pipeline. A DCR:
- is linked to a specific DCE;
- contains one or more Streams, which define what input data columns and types to allow;
- contains one or more Destinations, the Log Analytics workspaces;
- contains one or more Flows, which map a Stream to a Destination via a Transform Rule; and
- contains Azure IAM Roles assigned to App Registrations / Service Principals / Managed Identities, so that ingest can be controlled.
Or, put visually:
You can't (currently) use the Azure Portal to create a DCR that sends custom data to the built-in tables. For this step, we need to create an ARM Template.
To make this bit easy for you, I've prepared the ARM template below in a nice tidy Deploy to Azure button:
What this ARM template does is:
- Create a
Custom-CommonSecurityLogStream
stream. This is important: the stream must contain the fields we've mapped above because any extra fields will simply be discarded.- This includes fields generated/mapped by Fluent Bit, like
TimeGenerated
andComputer
; and fields extracted through the parser, likeDestinationIP
andSourcePort
.
- This includes fields generated/mapped by Fluent Bit, like
- Maps this stream to the output table
Microsoft-CommonSecurityLog
. Unlike custom tables, which have aCustom-
prefix, the built-in tables use theMicrosoft-
prefix. This template uses atransformKql
of "source": in other words, it won't transform the source data, just passing it through as-is.
In this case, I'm using the Custom-CommonSecurityLogStream
stream name; you'll need this later.
Once you create the DCR, you'll also need to get the immutable ID (looks like dcr-00000000000000000000000000000000
). If you view the DCR in the Azure Portal, click JSON View.
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"dataCollectionRuleName": {
"type": "string",
"metadata": {
"description": "Specifies the name of the Data Collection Rule to create."
}
},
"location": {
"defaultValue": "[resourceGroup().location]",
"type": "string",
"metadata": {
"description": "Specifies the location in which to create the Data Collection Rule."
}
},
"workspaceResourceId": {
"type": "string",
"metadata": {
"description": "Specifies the Azure resource ID of the Log Analytics workspace to use."
}
},
"endpointResourceId": {
"type": "string",
"metadata": {
"description": "Specifies the Azure resource ID of the Data Collection Endpoint to use."
}
}
},
"resources": [
{
"type": "Microsoft.Insights/dataCollectionRules",
"apiVersion": "2021-09-01-preview",
"name": "[parameters('dataCollectionRuleName')]",
"location": "[parameters('location')]",
"properties": {
"dataCollectionEndpointId": "[parameters('endpointResourceId')]",
"streamDeclarations": {
"Custom-CommonSecurityLogStream": {
"columns": [
{
"name": "TimeGenerated",
"type": "datetime"
},
{
"name": "CommunicationDirection",
"type": "string"
},
{
"name": "Computer",
"type": "string"
},
{
"name": "DestinationIP",
"type": "string"
},
{
"name": "DestinationPort",
"type": "int"
},
{
"name": "DeviceAction",
"type": "string"
},
{
"name": "DeviceCustomString1",
"type": "string"
},
{
"name": "DeviceInboundInterface",
"type": "string"
},
{
"name": "ProcessName",
"type": "string"
},
{
"name": "Protocol",
"type": "string"
},
{
"name": "ReceiptTime",
"type": "string"
},
{
"name": "ReceivedBytes",
"type": "long"
},
{
"name": "SourceIP",
"type": "string"
},
{
"name": "SourcePort",
"type": "int"
},
{
"name": "Message",
"type": "string"
}
]
}
},
"destinations": {
"logAnalytics": [
{
"workspaceResourceId": "[parameters('workspaceResourceId')]",
"name": "clv2ws1"
}
]
},
"dataFlows": [
{
"streams": [
"Custom-CommonSecurityLogStream"
],
"destinations": [
"clv2ws1"
],
"transformKql": "source",
"outputStream": "Microsoft-CommonSecurityLog"
}
]
}
}
],
"outputs": {
"dataCollectionRuleId": {
"type": "string",
"value": "[resourceId('Microsoft.Insights/dataCollectionRules', parameters('dataCollectionRuleName'))]"
}
}
}
Note: You may get a warning along the lines of
Value is not accepted. Valid values: "Microsoft-Event", "Microsoft-InsightsMetrics", "Microsoft-Perf", "Microsoft-Syslog", "Microsoft-WindowsEvent".
. You can safely ignore this - it's just a validation warning because Microsoft haven't updated the JSON schema yet.
Assign IAM Roles to the DCR
Once you've created the DCR, the final step is to assign an IAM Role to the App Registration we created before.
In the Azure Portal, go to the DCR, click Access control (IAM), and Add role assignment. You want to assign Monitoring Metrics Publisher to the App Registration.
Install Fluent Bit
There are lots of ways to install Fluent Bit... just go read the docs at https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit 😉
Configure a Fluent Bit parser for OpenWrt firewall logs
Based on our mapping, we're going to need to use a regular expression to parse the OpenWrt iptables logs, and tag them with the appropriate column names.
Instead of modifying the existing Fluent Bit parsers configuration, I simply create a new parsers.conf
file:
# Based on https://github.com/fluent/fluent-bit/blob/master/conf/parsers_extra.conf
[PARSER]
Name iptables-openwrt
Format regex
Regex (?<DeviceAction>reject|accept|drop) (?<DeviceCustomString1>.*?) (?<CommunicationDirection>[\w\s]+): IN=(?<DeviceInboundInterface>[\w\-\.]+)? OUT=(?<DeviceOutboundInterface>[\w\-\.]+)?( MAC=((?<DestinationMACAddress>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w):(?<SourceMACAddress>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w):\w\w:\w\w)?)? SRC=(?<SourceIP>[\w\.\:]+) DST=(?<DestinationIP>[\w\.\:]+) LEN=(?<ReceivedBytes>\d+) .* PROTO=(?<Protocol>[\w\d]+)( SPT=(?<SourcePort>\d+) DPT=(?<DestinationPort>\d+))?
Types SourcePort:integer,DestinationPort:integer,ReceivedBytes:integer
#
# Built-in, from https://github.com/fluent/fluent-bit/blob/master/conf/parsers.conf
#
[PARSER]
Name syslog-rfc3164
Format regex
Regex /^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/
Time_Key time
Time_Format %b %d %H:%M:%S
Time_Keep On
Create a Fluent Bit config
Next, we'll need to create a syslog input (to receive data from OpenWrt), parse it with the RFC3164 Parser, rename a bunch of the columns in line with the mapping we did, and then finally send them to Log Analytics via the azure_logs_ingestion
output.
Note:
stream_name
is not currently part of theazure_logs_ingestion
plugin, but is needed to map to our DCR... I've created a PR, which is currently pending approval.
Tying it all together, my fluent-bit.conf
looks like:
[SERVICE]
Parsers_File /usr/local/etc/fluent-bit/parsers.conf
Flush 1
Log_Level info
[INPUT]
Name syslog
Mode udp
Port 1514
Parser syslog-rfc3164
Tag syslog.openwrt
[FILTER]
Name modify
Match syslog.*
Rename host Computer
Rename ident ProcessName
Rename message Message
Remove pid
Remove pri
Rename time ReceiptTime
[FILTER]
Name parser
Match syslog.openwrt
Key_Name Message
Parser iptables-openwrt
Preserve_Key On
Reserve_Data On
[OUTPUT]
Name azure_logs_ingestion
Match syslog.openwrt
client_id 00000000-0000-0000-0000-000000000000
client_secret 00000~0000000000000.0000000000000000-000
tenant_id 00000000-0000-0000-0000-000000000000
dce_url https://example-xxxx.westus2-1.ingest.monitor.azure.com
dcr_id dcr-00000000000000000000000000000000
table_name CommonSecurityLog
stream_name Custom-CommonSecurityLogStream
time_generated true
time_key TimeGenerated
Compress true
Run this with fluent-bit -c fluent-bit.conf
, and you should see your logs flowing into Sentinel in near-real-time! 🎉