LogRhythm SIEM allows you to normalize log messages sent from JSON log sources, making collection easy and standardizing the formatting for the Mediator to ingest the metadata reliably into a consistent schema. The introduction of the JSON Policy Builder provides an easy to use, GUI-based experience to normalize JSON rules. You can use the JSON parsing policies to extract, transform, and map JSON-formatted data sources into a consistent schema.
Policy File Structure
A policy file consists of the following components:
{
"name": "source_name",
"filter": "condition_expression",
"schemaRule": {
"fanout": {
"inputField": ["$.path.to.array[*]"]
},
"convertoJson": ["$.path.to.convert"]
},
"transforms": [
{ /* field mappings */ }
],
"subtransforms": [
{ /* conditional transformations */ }
]
}
Core Elements
|
Element |
Description |
|---|---|
|
Name |
This serves as the unique identifier for the data source or policy. |
|
Filter |
A JPath condition that determines how this policy is applied. |
|
Schema Rule (optional) |
Fanout: Manages the processing of elements within an array. Convert to JSON: Transforms string fields into JSON objects. |
|
Transforms |
Defines the field mappings from the source schema to the target schema. |
|
Subtransforms (optional) |
These are conditional transformation rules that apply under specific circumstances. |
Field Transformation Structure
Each transform entry follows this format:
{
"inputRule": "$.source.field.path",
"LRSchemaField": "normalized_field_name",
"type": "String",
"default": null,
"alternativeFields": ["$.backup.field.path"],
"format": null,
"FanoutParentElement": "$.path.to.array[*]",
"condition": null
}
Transform Properties
|
Property |
Description |
|---|---|
|
inputRule |
The JSON path leading to the source field (or a literal value). |
|
LRSchemaField |
The name of the target field. |
|
type |
The data type (e.g., string, dateTime, number, decimal). |
|
default |
The value assigned if the field is absent. |
|
alternativeFields |
Fields that serve as backup options. |
|
format |
The pattern used for formatting (e.g., applicable for dates). |
|
FanoutParentElement |
The path to the parent array. |
|
condition |
The rule for conditional transformation. |
Creating a New Policy
Step 1: Understand Your Data Source
Examine JSON data to identify:
-
Important fields
-
Nested structures and arrays
-
Data types
Step 2: Define Basic Policy Structure
{
"name": "your_source_name",
"filter": "@.@metadata.beat == 'yourbeat' || @.device_type =~ /(?i)your_pattern/",
"schemaRule": {
"fanout": {
"inputField": null
}
},
"transforms": [],
"subtransforms": []
}
Step 3: Define Field Transforms
{
"inputRule": "$.path.to.your.field",
"LRSchemaField": "appropriate_schema_field",
"type": "String",
"default": null,
"alternativeFields": [],
"format": null
}
Step 4: Handle Array Data
-
Configure the fanout rule:
"schemaRule": {
"fanout": {
"inputField": ["$.path.to.array[*]"]
}
}
-
Use FanoutParentElement in transforms:
{
"inputRule": "$.field.in.array",
"LRSchemaField": "schema_field",
"FanoutParentElement": "$.path.to.array[*]",
"type": "String"
}
Step 5: Add Conditional Logic
"subtransforms": [
{
"condition": "@.field.exists || @.another.condition",
"exitonmatch": true,
"transforms": [
{
"inputRule": "$.conditional.field",
"LRSchemaField": "schema_field",
"type": "String"
}
]
}
]
Step 6: Test and Refine
-
Use sample data to verify mappings; and
-
Adjust field paths and conditions as needed.
Best Practices
-
Use JsonPath syntax for field extraction;
-
Include alternative fields for resilience;
-
Use appropriate data types and formatting;
-
Use regex for case-insensitive matching;
-
Apply filtering conditions to avoid irrelevant data;
-
Leverage subtransforms for conditional processing; and
-
Comment complex sections for maintainability.
Available Schema Fields
Standard schema fields include:
beatname, device_type, fullyqualifiedbeatname, time, object, objectname, objecttype, hash, policy, result, url, useragent, responsecode, subject, version, command, reason, action, status, sessiontype, process, processid, parentprocessid, parentprocessname, parentprocesspath, quantity, amount, size, rate, minutes, seconds, milliseconds, session, kilobytesin, kilobytesout, kilobytes, packetsin, packetsout, severity, vmid, vendorinfo, threatname, threatid, cve, smac, dmac, sinterface, dinterface, sip, dip, snatip, dnatip, sname, dname, serialnumber, login, account, sender, recipient, group, domainimpacted, domainorigin, protnum, protname, sport, dport, snatport, dnatport, augmented, tag1, tag2, tag3, tag4, tag5, tag6, tag7, tag8, tag9, tag10, original_message
JSON Parsing Policy Reference
Formatters
Formatters allow you to transform values from JSON fields before mapping them to the schema. They are specified in the format field of a transform.
|
Formatter |
Data Type |
Description |
Syntax Example |
|---|---|---|---|
|
lowercase |
String |
Converts a string to lowercase. |
|
|
uppercase |
String |
Converts a string to uppercase. |
|
|
prefix |
String |
Adds a prefix to a string. |
|
|
suffix |
String |
Adds a suffix to a string. |
|
|
replace |
String |
Replaces one substring with another. |
|
|
replaceall |
String |
Performs multiple replacements, and replaces all instances of an old string. |
|
|
kilobyte |
Number |
Converts bytes to kilobytes. |
|
|
DateTime Formats |
Date/Time |
To convert the date/time format, provide a specific format to be used, as illustrated in the next column. For guidance on creating your own date and time format, refer to Custom Date and Time Format Strings. |
|
Operations
Operations allow you to transform or extract values from JSON fields. They are specified in the inputRule field in a transform.
|
Operation |
Data Type |
Description |
Syntax Example |
|---|---|---|---|
|
IsIP |
String |
Validates if a value is an IP address (IPv4/IPv6). |
|
|
Regex |
String |
The method extracts a value using a regex pattern and requires three arguments:
|
|
|
SPLIT |
String |
Splits a string by a delimiter and returns a specific index. |
|
|
Concat |
String |
Concatenates two or more string values. |
|
|
ConcatArray |
Misc |
Joins array elements with a delimiter. |
|
|
ToString |
Misc |
Converts a value to a String. |
|
|
EpochSectoDateTime |
Date/Time |
Converts Unix timestamp (seconds) to DateTime. |
|
|
EpochMilliSectoDateTime |
Date/Time |
Converts Unix timestamp (milliseconds) to DateTime. |
|
|
EpochMicroSectoDateTime |
Date/Time |
Converts Unix timestamp (microseconds) to DateTime. |
|
|
LocalDateTime |
Date/Time |
Gets the local date-time. |
|
|
Add |
Number/Decimal |
Adds a number to a JSON Value. |
|
|
Subtract |
Number/Decimal |
Subtracts a number from a JSON value |
|
|
Multiply |
Number/Decimal |
Multiply a JSON Value by a number. |
|
|
Divide |
Number/Decimal |
Divide a JSON Value by a number. |
|
Example Usage
Example 1: Basic Field Extraction
{
"inputRule": "$.sourceIPAddress",
"LRSchemaField": "sip",
"type": "string",
"default": null,
"alternativeFields": ["$.src_ip", "$.source.ip"],
"format": null
}
Example 2: Using an Operation
{
"inputRule": "IsIP($.sourceIPAddress,true)",
"LRSchemaField": "sip",
"type": "string",
"default": null,
"alternativeFields": null,
"format": null
}
Example 3: Using a Format
{
"inputRule": "$.username",
"LRSchemaField": "login",
"type": "string",
"default": null,
"alternativeFields": null,
"format": "lowercase"
}
Example 4: Using Both Format and Operation
{
"inputRule": "SPLIT($.userIdentity.principalId,':',1)",
"LRSchemaField": "session",
"type": "string",
"default": null,
"alternativeFields": ["$.userIdentity.arn"],
"format": "lowercase"
}
Example 5: DateTime Formatting
{
"inputRule": "$.eventTime",
"LRSchemaField": "normal_msg_date",
"type": "datetime",
"default": null,
"alternativeFields": null,
"format": "yyyy-MM-ddTHH:mm:ss.fffK"
}
Example 6: Using Array Concatenation
{
"inputRule": "ConcatArray($.response.messagesDelivered.recipient[*],',')",
"LRSchemaField": "recipient",
"type": "String",
"default": null,
"alternativeFields": null,
"format": null
}