Правила агрегации

В данном разделе представлен обзор структуры, принципов работы, процесса создания и расширения конфигурации правил агрегации в R-Vision SIEM. Управление правилами агрегации в системе осуществляется в разделе Экспертиза.

О правилах агрегации

Правило агрегации — это структурированный объект данных в формате RObject (.ro), который описывает логику объединения и анализа групп событий безопасности.

Структура правила агрегации включает определение условий фильтрации событий, параметров группировки, условий начала и завершения агрегации, а также стратегий агрегации для каждого поля. Фильтрация событий осуществляется на основе заранее заданных критериев, определяющих, какие события будут включены в процесс агрегации. Условия начала и завершения агрегации задают логику окон агрегации, определяя когда и как агрегационное окно будет открыто или закрыто.

Важной частью правила является определение группировки событий, что позволяет собирать события в логически согласованные группы для последующей агрегации. Группировка может осуществляться по различным полям события, например, содержащим данные об IP-адресах, идентификаторах устройств или пользователях. Если указано несколько полей для группировки, то события будут группироваться по уникальным комбинациям значений этих полей.

Стратегии агрегации для каждого поля определяют, как будут объединяться значения соответствующих полей событий в процессе агрегации. Доступны различные стратегии, такие как суммирование, выбор максимального и минимального значений, различные формы конкатенации строк, такие как обычное соединение, соединение с новой строки или без разделителей, а также методы для работы с массивами, включая добавление всех значений, формирование массива уникальных значений и выбор самого длинного или короткого массива. Кроме того, существуют стратегии для сохранения значений из первого или последнего события в агрегации.

Работа с правилом агрегации

Доступные операции над правилом агрегации:

Создание правила агрегации

Чтобы добавить правило агрегации:

  1. Перейдите в раздел Экспертиза. Система отобразит сведения об имеющихся элементах экспертизы, в том числе их текущий статус (включен/выключен).

  2. Нажмите на кнопку Создать и выберите из выпадающего списка пункт Правило агрегации. Система отобразит окно создания правила агрегации.

    При создании элемента экспертизы поля его структуры заполняются значениями по умолчанию. Для быстрой настройки структуры в системе доступны предустановленные примеры. Чтобы заполнить поля элемента экспертизы с помощью примера, нажмите на кнопку Примеры и выберите требуемый пример из выпадающего списка.
  3. Заполните поля правила агрегации, чтобы определить логику его работы.

    Строки, начинающиеся с символа $ ($foo), интерпретируются в VRL-блоках элементов экспертизы как переменные окружения. Чтобы избежать этого, экранируйте такие строки символом $ ($$foo).
  4. Нажмите на кнопку Опубликовать версию, чтобы сохранить изменения и опубликовать созданное правило агрегации. Система отобразит уведомление об успешном создании правила агрегации. Новое правило агрегации отобразится в таблице раздела Экспертиза.

    Чтобы правило агрегации стало доступно в конвейерах событий, его необходимо включить.

    Вы можете создать правило агрегации без его публикации (в виде черновика) с помощью кнопки Сохранить черновик. Система отобразит уведомление об успешном создании черновика правила. Черновик отобразится в таблице раздела Экспертиза.

Изменение правила агрегации

Чтобы изменить правило агрегации:

  1. Перейдите в раздел Экспертиза. Система отобразит сведения об имеющихся элементах экспертизы, в том числе их текущий статус (включен/выключен).

  2. Нажмите на строку правила агрегации в списке. Система отобразит в правой части экрана карточку этого правила с подробной информацией о нем.

  3. Выберите опцию Изменить в выпадающем меню Действия (more vertical) в верхней части карточки правила агрегации. Отобразится окно настроек правила.

    Вы также можете открыть окно настроек с помощью кнопки в нижней части карточки:

    • Для опубликованной версии: нажмите на кнопку Просмотр в нижней части карточки. Система отобразит окно просмотра настроек элемента экспертизы. Нажмите на кнопку Изменить в правом нижнем углу окна.

    • Для черновика: нажмите на кнопку Изменить в нижней части карточки.

  4. Внесите требуемые изменения в конфигурацию правила агрегации.

  5. Нажмите на кнопку Обновить версию. Новая версия правила агрегации с измененной конфигурацией будет опубликована.

    При публикации новой версии элемента экспертизы требуется увеличить его текущую версию в поле version.

    Вы можете сохранить измененную конфигурацию правила агрегации без публикации (в виде черновика) с помощью кнопки Сохранить черновик. Конфигурация будет сохранена для дальнейшего редактирования. В этом случае в верхней части карточки правила будет отображаться предупреждение о наличии неопубликованных изменений.

Удаление правила агрегации

Удаление доступно только для элементов экспертизы с типом создания Пользовательский.

Удаление недоступно для элементов экспертизы, используемых в других сущностях системы.

Чтобы удалить правило агрегации:

  1. Перейдите в раздел Экспертиза. Система отобразит сведения об имеющихся элементах экспертизы, в том числе их текущий статус (включен/выключен).

  2. Нажмите на строку правила агрегации в списке. Система отобразит в правой части экрана карточку этого правила с подробной информацией о нем.

  3. Выберите опцию Удалить в выпадающем меню Действия (more vertical) в верхней части карточки правила агрегации. Отобразится окно подтверждения удаления правила.

  4. Нажмите на кнопку Удалить. Система отобразит уведомление об удалении правила агрегации, и правило будет удалено из списка элементов экспертизы.

Вы также можете удалить группу элементов экспертизы. Для этого:

  1. Перейдите в раздел Экспертиза. Система отобразит сведения об имеющихся элементах экспертизы, в том числе их текущий статус (включен/выключен).

  2. Установите флажки напротив тех элементов, которые вы хотите удалить, в левом столбце таблицы.

  3. Нажмите на кнопку trash на панели инструментов. Отобразится окно удаления.

  4. Нажмите на кнопку Удалить. Система отобразит уведомление об удалении выбранного элемента экспертизы, и элемент экспертизы будет удален из списка элементов экспертизы.

Структура правила агрегации

Правило агрегации в системе R-Vision SIEM представляет собой структурированный объект данных в формате RObject (.ro), который включает в себя ряд обязательных и опциональных полей для определения процесса агрегации.

Строки, начинающиеся с символа $ ($foo), интерпретируются в VRL-блоках элементов экспертизы как переменные окружения. Чтобы избежать этого, экранируйте такие строки символом $ ($$foo).
Поле Описание Тип данных Обязательное поле

id

Уникальный идентификатор правила агрегации.

строка

да

name

Название правила агрегации.

строка

да

type

Тип элемента. Всегда имеет значение aggregation_rule для элементов экспертизы этого типа.

строка

да

version

Версия правила агрегации, представленная в формате Semantic Versioning.

строка

да

author

Имя и контактные данные автора правила агрегации.

строка

нет

description

Описание правила агрегации, содержащее дополнительную информацию о его назначении и функциональности.

строка

нет

data_source

Список источников данных, с которыми совместимо правило агрегации. Используется для классификации и быстрого поиска правил в системе.

массив строк

массив объектов

нет

tags

Список тегов, присвоенных правилу агрегации для классификации и быстрого поиска.

массив строк

нет

filter

Фильтр на языке VRL, определяющий условия, при которых события будут агрегироваться.

строка

да

group_by

Поля, по которым будут группироваться агрегированные события. Если ничего не указано — все поступающие события будут объединяться в одну группу.

массив строк

нет

starts_when

Условие на языке VRL, при выполнении которого открывается новое агрегационное окно. Нельзя использовать одновременно с параметром ends_when.

строка

нет

max_events

Максимальное количество событий для агрегации.

число

нет

expire_after_ms

Время в миллисекундах, после которого агрегационное окно закрывается, если не поступают новые события.

число

нет

flush_period_ms

Интервал в миллисекундах для проверки и закрытия агрегационных окон.

число

нет

merge_strategies

Стратегии, определяющие, как будут объединяться значения полей из агрегированных событий.

объект

нет

ends_when

Условие на языке VRL, при выполнении которого агрегационное окно закрывается. Нельзя использовать одновременно с параметром starts_when.

строка

нет

tests

Блок тестирования правила агрегации, содержащий массив тестовых событий и условия для проверки срабатывания правила.

объект

нет

Структура data_source

Поле data_source, представленное массивом строк, имеет структуру:

data_source:
- <platform> # Платформа или информационная система
- <source> # Источник событий.
# События (EventId) или путь к журналу событий:
- <EventID_1>
- <EventID_2>

Поле data_source, представленное массивом объектов, имеет следующую структуру:

Поле Описание Тип данных Обязательное поле

platform

Платформа или информационная система.

строка

нет

source

Источник событий.

строка

нет

events

События (EventId) или путь к журналу событий.

массив строк

нет

Структура tests:

Поле Описание Тип данных Обязательное поле

name

Название теста, позволяющее идентифицировать его в тестовой выдаче.

строка

да

events

Поток тестовых событий, обрабатываемых правилом.

массив

да

assertion

Блок правил на языке VRL, который будет вызываться на каждое событие, испускаемое полем events. Используя встроенные функции assert и assert_eq, можно строить проверки на соответствие испускаемого события тестовым условиям.

строка

да

Опции merge_strategies

Значение, указанное в merge_strategies, определяет метод, с помощью которого значения из полей разных событий будут объединяться в ходе агрегации.

Опция Описание

array

Создаёт массив, включающий все встречающиеся значения поля из агрегированных событий. Используется, когда необходимо сохранить все значения без исключения.

concat

Объединяет строковые значения в одну строку с разделением пробелами. Подходит для ситуаций, где необходимо сформировать единое текстовое представление из разных строковых значений.

concat_newline

Аналогично concat, но использует перенос строки в качестве разделителя. Это облегчает различение отдельных значений в одной длинной строке.

concat_raw

Соединяет строковые значения без какого-либо разделителя между ними. Эта стратегия полезна, когда необходимо сформировать непрерывную строку данных.

discard

Игнорирует все значения, кроме первого встретившегося. Полезно, когда важно сохранить только первоначальное значение поля из всей группы событий.

flat_unique

Формирует массив уникальных значений, исключая повторения. Используется для идентификации уникальных характеристик в группе событий.

longest_array

Выбирает массив максимальной длины из всех массивов, встречающихся в агрегированных событиях. Применяется для определения наиболее обширного набора данных.

max

Находит и сохраняет максимальное числовое значение среди всех агрегированных событий. Это может быть полезно для определения пиковых значений.

min

Аналогично max, но для определения минимального числового значения.

retain

Сохраняет значение из последнего события в агрегированной группе. Это полезно для отслеживания самых последних изменений.

shortest_array

Выбирает массив минимальной длины, что может быть полезно для определения наименее обширного набора данных в группе.

sum

Вычисляет сумму всех числовых значений поля. Эта стратегия широко используется для агрегации количественных данных, таких как счетчики или объёмы.

Составление правила

Управление правилами агрегации в системе R-Vision SIEM осуществляется в разделе Экспертиза.

Для составления правила агрегации в системе R-Vision SIEM, необходимо определить ключевые элементы структуры правила. Эти элементы включают поля метаданных и поля, управляющие логикой работы правила.

  1. Определите метаданные правила:

    • id: задайте уникальный идентификатор правила.

    • name: задайте название правила.

    • version: укажите версию правила, следуя стандартам Semantic Versioning.

    • type: определите тип правила как aggregation_rule.

    • author (опциональное): укажите имя и контактные данные автора.

    • description(опциональное): предоставьте краткое описание назначения и функциональности правила.

    • data_source (опциональное): укажите список совместимых с правилом источников данных для классификации и фильтрации.

    • tags (опциональное): добавьте теги для классификации и поиска правила.

  2. Разработайте логику работы правила:

    • filter (обязательное): создайте условие для фильтрации событий.

    • group_by (обязательное): определите поля для группировки событий.

    • starts_when и ends_when (опциональные): настройте условия открытия и закрытия окна агрегации. В рамках правила допускается одновременное использование только одного из параметров.

    • expire_after_ms и flush_period_ms (опциональные): установите временные параметры для управления агрегационными окнами.

    • max_events (опциональное): укажите максимальное количество событий для агрегации.

    • merge_strategies (опциональное): выберите стратегии агрегации для полей событий.

  3. Разработайте тесты для правила (опциональное):

    • tests: создайте тесты, включая сценарии и условия, для проверки работы правила.

Поля метаданных

Метаданные являются основой для идентификации и описания правила агрегации в системе. Они должны быть заполнены следующим образом:

  • id: установите уникальный идентификатор правила. Этот идентификатор должен быть уникальным в рамках системы.

  • name: укажите название правила.

  • version: задайте версию правила, следуя принципам Semantic Versioning (например, 1.0.0).

  • type: установите тип правила aggregation_rule.

  • author (опциональное): укажите имя и контактные данные создателя правила.

  • description (опциональное): предоставьте краткое описание назначения и функциональности правила.

  • data_source (опциональное): укажите список совместимых с правилом источников данных.

  • tags (опциональное): добавьте теги для классификации правила в системе.

Поле filter

Поле filter служит для определения критериев отбора событий, которые будут включены в процесс агрегации. Фильтрация основана на языке VRL, предоставляя гибкие возможности для анализа и обработки данных событий.

Пример:

filter: !vrl |
  .event_type == "login_failure" && .source_ip != "192.168.1.1"

В этом примере выбираются события с типом "login_failure", исключая события от конкретного IP-адреса.

Поле group_by

Поле group_by задает основу для группировки событий в рамках агрегации. Можно указать одно или несколько полей, по которым события будут группироваться, что позволяет более точно анализировать сходные или связанные данные. Если ничего не указано — все поступающие события будут объединяться в одну группу.

Пример:

group_by:
  - source_ip
  - user_agent

Здесь события группируются по IP-адресу источника и типу пользовательского агента.

Поле starts_when

starts_when определяет условие, при котором откроется новое окно агрегации. Это условие задает начальную точку для создания новой группы событий.

Параметр starts_when нельзя использовать одновременно с параметром ends_when в рамках одного правила агрегации.

Пример:

starts_when: !vrl |
  .event_type == "network_connection" && .port == 443

Агрегация начнется, когда обнаружится событие типа "network_connection" на порту 443.

Поле ends_when

ends_when определяет условие для закрытия окна агрегации. Это условие может быть основано на различных критериях, соответствующих потребностям безопасности и анализа.

Параметр ends_when нельзя использовать одновременно с параметром starts_when в рамках одного правила агрегации.

Пример:

ends_when: !vrl |
  .event_type == "network_disconnection"

Здесь агрегация завершается, когда появляется событие разъединения сети.

Поле expire_after_ms

expire_after_ms устанавливает максимальный период ожидания для агрегационного окна в миллисекундах. Это поле помогает контролировать продолжительность агрегации и гарантирует, что данные обрабатываются в разумные временные рамки.

Пример:

expire_after_ms: 30000

Агрегационное окно будет закрыто, если новые события не появятся в течение 30 секунд.

Поле flush_period_ms

flush_period_ms задает периодичность проверки и закрытия агрегационных окон. Этот параметр важен для обеспечения своевременной обработки данных и управления памятью.

Пример:

flush_period_ms: 10000

Система будет проверять и закрывать агрегационные окна каждые 10 секунд.

Поле max_events

max_events определяет максимальное количество событий, которое может быть включено в одно агрегационное окно. Этот параметр помогает контролировать объем данных в каждой агрегации.

Пример:

max_events: 20

Агрегационное окно будет закрыто после обработки 20 событий.

Поле merge_strategies

merge_strategies определяет стратегии обработки и объединения данных из различных событий в процессе агрегации. Эти стратегии позволяют гибко управлять данными, создавая информативные и ценные агрегированные записи.

Если для указанного поля не определена стратегия, будет использоваться поведение по умолчанию.

Поведение по умолчанию:

  • Первое значение строкового поля сохраняется, а последующие значения отбрасываются.

  • Для полей временных меток сохраняется первое значение, и добавляется новое поле [имя-поля]_end с временной меткой последнего полученного значения.

  • Числовые значения суммируются.

Пример:

merge_strategies:
  bytes_sent: sum
  messages: concat_newline

Здесь значения поля bytes_sent суммируются, а строки в поле messages объединяются с разделителем в виде новой строки.

Поле tests

tests включает набор тестов для проверки и верификации правила агрегации. Эти тесты помогают убедиться, что правило работает корректно и соответствует ожидаемым результатам.

Определите параметры теста:

  • В name укажите название теста.

  • В events создайте массив событий, которые будут использоваться для тестирования. Эти события должны имитировать поток реальных исходных событий, которые поступают на агрегатор.

  • В assertion устанавливается условие или набор условий, которые должны быть выполнены в результате обработки тестовых событий правилом. Это может включать проверку конкретных полей в итоговых агрегированных событиях или подсчет количества таких событий.

Тестовые события

Тестовые события должны имитировать данные, которые в реальности поступают на агрегатор. Они должны соответствовать формату и структуре событий, обрабатываемых правилом агрегации и содержать поля, которые используются в правиле.

Условия проверки

Условия должны точно отражать ожидаемый результат применения правила к тестовым событиям. Это может быть проверка наличия определенных полей в событии, соответствие их значений ожидаемым или сравнение количества событий с предполагаемым исходом. Для более сложных правил могут потребоваться более сложные утверждения, которые могут включать логические операции и сравнения.

Пример:

tests:
  - name: AggregationTest
    events:
      - { "event_type": "login_failure", "source_ip": "192.168.1.1" }
      - { "event_type": "login_failure", "source_ip": "192.168.1.2" }
    assertion: !vrl |
      assert_eq!(.aggregated_event_count, 2)

Здесь проверяется, что правило агрегирует два события типа "login_failure".

Структура агрегированного события

В результате выполнения агрегации создается агрегированное событие. Помимо полей, которые явно определяются правилом агрегации, структура агрегированного события содержит ряд служебных полей, заполняемых по умолчанию.

Ключ поля Тип данных Описание

id

UUID

ID агрегированного события. Генерируется автоматически при создании события.

sourceIp

IPv6

IP-адрес источника базового события. По умолчанию записывается значение для первого базового события в агрегации.

tenantId

LCString

ID тенанта, к которому относится агрегированное событие.

art

DateTime

Дата и время получения базового события. По умолчанию записывается значение для первого базового события в агрегации.

raw

String

Текст базового события. По умолчанию записывается значение для первого базового события в агрегации.

collectorId

LCString

ID коллектора, на котором создается агрегированное событие.

timestamp

DateTime

Дата и время создания агрегированного события.

aggregationRuleName

LCString

Название правила агрегации, обработавшего базовое событие.

aggregationRuleId

LCString

ID правила агрегации, обработавшего базовое событие.

type

Enum

Тип события. По умолчанию устанавливается значение "2", соответствующее типу "агрегированное событие".

cnt

UInt32

Количество базовых событий, на основе которых создается агрегированное событие.

baseEventIds

Массив UUID

Список ID базовых событий, на основе которых создается агрегированное событие.

baseEventTimestamps

Массив DateTime

Список дат и времен получения базовых событий, на основе которых создается агрегированное событие.

groupedBy

Массив LCString

Список полей модели событий, по которому группируются базовые события при агрегации.

Поля id, tenantId, collectorId, timestamp, type, cnt, baseEventIds, baseEventTimestamps заполняются системой автоматически. Их правила заполнения не могут быть изменены.

Поля groupedBy, aggregationRuleId и aggregationRuleName заполняются на основе настроек, заданных в правиле агрегации.

При необходимости вы можете изменить правила заполнения остальных полей. Для этого задайте соответствующие правила в блоке merge_strategies правила агрегации.

Прочие служебные и регулярные поля модели события заполняются в соответствии с настройками, заданными в блоке merge_strategies правила агрегации.

Пример правила агрегации

Example 1. Правило агрегации, схема RObject
# Уникальный идентификатор правила.
id: example/network_activity_rule

# Название правила.
name: NetworkActivityAggregationRule

# Тип элемента экспертизы.
type: aggregation_rule

# Версия правила.
version: 1.0.0

# Описание назначения правила.
description: Агрегация событий сетевой активности для анализа трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_1
    - EventID_2

# Теги, связанные с правилом.
tags:
  - network
  - traffic_analysis

# Отбрасывает события, которые не соответствуют критериям.
filter: !vrl |
  # Фильтрует события, не относящиеся к категории "Network" или имеющие низкий уровень серьезности.
  .category == "Network" && .severity > 3

# Группировка событий по источнику и назначению.
group_by:
  - src_ip
  - dest_ip

# Условие открытия агрегационного окна.
start_when: !vrl |
  # Начинаем агрегацию при обнаружении события начала соединения.
  .event_type == "connection_start"

# Максимальное количество событий.
max_events: 5

# Время ожидания последнего события (миллисекунды).
expire_after_ms: 60000

# Интервал проверки окон агрегации (миллисекунды).
flush_period_ms: 2000

# Стратегии агрегации значений для каждого указанного поля.
merge_strategies:
  bytes_sent: sum
  bytes_received: sum

# Тесты для проверки работы правила.
tests:
 - name: Network Activity Test
   events:
   - {"category":"Network", "severity":5, "event_type":"connection_start", "src_ip":"192.0.2.10", "dest_ip":"192.0.2.20", "bytes_sent":1500, "bytes_received":2000}
   - {"category":"Network", "severity":4, "event_type":"data_transfer", "src_ip":"192.0.2.10", "dest_ip":192.0.2.20", "bytes_sent":3000, "bytes_received":4000}
   - {"category":"Network", "severity":5, "event_type":"connection_end", "src_ip":"192.0.2.10", "dest_ip":"192.0.2.20", "bytes_sent":500, "bytes_received":600}
   assertion: !vrl |
     # Проверяем, что агрегированные значения соответствуют ожиданиям.
     assert_eq!(.[0], {
       "category": "Network",
       "event_type": "connection_start",
       "src_ip": "192.0.2.10",
       "dest_ip": "192.0.2.20",
       "bytes_sent": 5000,
       "bytes_received": 6600
     })

Интерпретация Метаданные правила:

  • id: уникальный идентификатор правила в системе.

  • name: название правила.

  • type: тип aggregation_rule указывает на то, что это правило агрегации.

  • version: версия правила.

  • description: описывает цель правила — "Агрегация событий сетевой активности для анализа трафика".

  • author: создатель правила.

  • tags: теги network и traffic_analysis классифицируют правило по его функциональности и области применения.

Фильтрация событий (filter):

  • filter: фильтрует события по категории "Network" и уровню серьезности более 3. Это сужает область применения правила только к событиям, которые удовлеворяют условиям фильтрации.

Группировка событий (group_by):

  • group_by: группирует события по полям src_ip и dest_ip. Это позволяет анализировать взаимодействия между конкретными источниками и назначениями в сети.

Условие начала агрегации (starts_when):

  • starts_when: определяет начало агрегации при появлении событий типа "connection_start", что является признаком начала нового сетевого соединения.

Параметры агрегационного окна (max_events, expire_after_ms, flush_period_ms):

  • max_events: ограничивает количество событий в агрегации до 5. После достижения этого количества, агрегационное окно закрывается и генерируется агрегированное событие.

  • expire_after_ms: устанавливает максимальный период ожидания последнего события в агрегационном окне в 60000 миллисекунд (1 минута).

  • flush_period_ms: задает частоту проверки и закрытия агрегационных окон в 2000 миллисекунд (2 секунды).

Стратегии агрегации (merge_strategies):

  • merge_strategies: определяет, как будут объединяться поля событий. Для "bytes_sent" и "bytes_received" применяется стратегия суммирования (sum).

Тестирование правила (tests):

  • name: "Network Activity Test" — название теста.

  • events: описывает ряд тестовых событий, имитирующих реальный сетевой трафик, для проверки эффективности правила.

  • assertion: проверяет, что правило корректно агрегирует события и вычисляет суммы переданных и полученных байтов. Это подтверждает, что правило работает как предполагалось и агрегирует данные согласно заданным критериям.

Обработка событий правилом агрегации "NetworkActivityAggregationRule" Проанализируем работу правила агрегации "NetworkActivityAggregationRule", фокусируясь на процессе обработки событий.

Представим, что на агрегатор поступает поток сетевых событий, которые соответствуют критериям, заданным в фильтре filter правила. Эти события содержат данные о типе сетевого соединения, IP-адресах источника и назначения, а также объёме переданных и полученных данных.

{
  "category": "Network",
  "severity": 5,
  "event_type": "connection_start",
  "src_ip": "192.168.1.10",
  "dest_ip": "192.168.1.20",
  "bytes_sent": 1500,
  "bytes_received": 2000
}

По мере поступления этих событий, они фильтруются правилом "NetworkActivityAggregationRule". Правило агрегирует события, группируя их по IP-адресам источника и назначения (group_by). Таким образом, события, исходящие от одной пары IP-адресов, объединяются в одно агрегированное событие.

Агрегация инициируется, когда обнаруживается событие типа connection_start (starts_when). Правило продолжает агрегировать события до тех пор, пока не будет достигнуто максимальное количество событий (max_events) или не истечёт время ожидания (expire_after_ms).

Агрегированное событие включает в себя сумму значений bytes_sent и bytes_received для всех событий в группе (merge_strategies).

{
  "event_type": "aggregated_network_activity",
  "src_ip": "192.168.1.10",
  "dest_ip": "192.168.1.20",
  "total_bytes_sent": 5000,
  "total_bytes_received": 6600
}

Данное агрегированное событие отражает общую активность между двумя конкретными IP-адресами, позволяя идентифицировать аномалии или необычную активность в сети.

Расширение конфигурации существующего правила

Рассмотрим сценарий, в котором необходимо расширить функциональность правила агрегации "NetworkActivityAggregationRule", чтобы улучшить анализ сетевого трафика и повысить точность агрегации событий.

Исходное правило агрегации "NetworkActivityAggregationRule", схема RObject
# Уникальный идентификатор правила.
id: example/network_activity_rule

# Название правила.
name: NetworkActivityAggregationRule

# Тип элемента экспертизы.
type: aggregation_rule

# Версия правила.
version: 1.0.0

# Описание назначения правила.
description: Агрегация событий сетевой активности для анализа трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_1
    - EventID_2

# Теги, связанные с правилом.
tags:
  - network
  - traffic_analysis

# Отбрасывает события, которые не соответствуют критериям.
filter: !vrl |
  # Фильтрует события, не относящиеся к категории "Network" или имеющие низкий уровень серьезности.
  .category == "Network" && .severity > 3

# Группировка событий по источнику и назначению.
group_by:
  - src_ip
  - dest_ip

# Условие открытия агрегационного окна.
start_when: !vrl |
  # Начинаем агрегацию при обнаружении события начала соединения.
  .event_type == "connection_start"

# Максимальное количество событий.
max_events: 5

# Время ожидания последнего события (миллисекунды).
expire_after_ms: 60000

# Интервал проверки окон агрегации (миллисекунды).
flush_period_ms: 2000

# Стратегии агрегации значений для каждого указанного поля.
merge_strategies:
  bytes_sent: sum
  bytes_received: sum

# Тесты для проверки работы правила.
tests:
 - name: Network Activity Test
   events:
   - {"category":"Network", "severity":5, "event_type":"connection_start", "src_ip":"192.0.2.10", "dest_ip":"192.0.2.20", "bytes_sent":1500, "bytes_received":2000}
   - {"category":"Network", "severity":4, "event_type":"data_transfer", "src_ip":"192.0.2.10", "dest_ip":192.0.2.20", "bytes_sent":3000, "bytes_received":4000}
   - {"category":"Network", "severity":5, "event_type":"connection_end", "src_ip":"192.0.2.10", "dest_ip":"192.0.2.20", "bytes_sent":500, "bytes_received":600}
   assertion: !vrl |
     # Проверяем, что агрегированные значения соответствуют ожиданиям.
     assert_eq!(.[0], {
       "category": "Network",
       "event_type": "connection_start",
       "src_ip": "192.0.2.10",
       "dest_ip": "192.0.2.20",
       "bytes_sent": 5000,
       "bytes_received": 6600
     })

Для этого добавим новые условия группировки и стратегии агрегации, а также дополнительные параметры для более тонкой настройки агрегации событий.

Example 2. Пример расширенного правила "EnhancedNetworkActivityAggregationRule", схема RObject
# Уникальный идентификатор правила.
id: example/network_activity_rule_enhanced

# Название правила.
name: EnhancedNetworkActivityAggregationRule

# Тип элемента экспертизы.
type: aggregation_rule

# Версия правила.
version: 1.0.0

# Описание назначения правила.
description: Расширенная агрегация событий сетевой активности для детального анализа трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_1
    - EventID_2

# Теги, связанные с правилом.
tags:
  - network
  - advanced_traffic_analysis

#  Условия для фильтрации событий.
filter: !vrl |
  # Фильтрация событий по категории "Network" и высокому уровню серьезности, а также специфическим характеристикам запросов.
  .category == "Network" && .severity > 3 && regex_match("^Mozilla", .user_agent)

# Расширенные параметры группировки событий.
group_by:
  - src_ip
  - dest_ip
  - user_agent
  - http_method

# Условие открытия агрегационного окна.
start_when: !vrl |
  # Агрегация начинается при обнаружении начала соединения или определенных типов HTTP-запросов.
  .event_type == "connection_start" || .http_method in ["POST", "GET"]

# Увеличенное максимальное количество событий.
max_events: 10

# Измененное время ожидания последнего события (миллисекунды).
expire_after_ms: 120000

# Интервал проверки окон агрегации (миллисекунды).
flush_period_ms: 3000

# Расширенные стратегии агрегации значений.
merge_strategies:
  bytes_sent: sum
  bytes_received: sum
  request_paths: longest_array

# Тесты для проверки работы расширенного правила.
tests:
 - name: Enhanced Network Activity Test
   events:
   - {"category":"Network", "severity":5, "event_type":"connection_start", "src_ip":"192.0.2.10", "dest_ip":"192.0.2.20", "user_agent": "Mozilla/5.0", "http_method": "POST", "bytes_sent":2000, "bytes_received":3000}
   - {"category":"Network", "severity":4, "event_type":"data_transfer", "src_ip":"192.0.2.10", "dest_ip":192.0.2.20", "user_agent": "Mozilla/5.0", "http_method": "GET", "bytes_sent":4000, "bytes_received":5000}
   assertion: !vrl |
     # Проверяем, что агрегированные значения соответствуют ожиданиям.
     assert_eq!(.[0], {
       "category": "Network",
       "event_type": "connection_start",
       "src_ip": "192.0.2.10",
       "dest_ip": "192.0.2.20",
       "user_agent": "Mozilla/5.0",
       "http_method": "POST",
       "bytes_sent": 6000,
       "bytes_received": 8000,
       "request_paths": "/api/load"
     })

Изменения, внесенные в улучшенное правило "EnhancedNetworkActivityAggregationRule":

  1. Дополнительные параметры группировки и стратегии агрегации:

    • Добавлена группировка по дополнительным полям, таким как user_agent или http_method, для более детального анализа сетевого трафика.

    • Введены новые стратегии агрегации, например, longest_array для поля request_paths, чтобы отслеживать наиболее часто используемые пути запросов.

  2. Изменение параметров max_events и expire_after_ms:

    • Увеличение значения max_events для обработки большего количества событий в одном агрегационном окне.

    • Изменение expire_after_ms для оптимизации времени ожидания агрегации событий, учитывая характер трафика.

  3. Тестовая проверка:

    • Добавлены тестовые сценарии с учётом новых параметров группировки и стратегий агрегации для проверки правила на способность адекватно агрегировать и анализировать более сложные сетевые события.

Интерпретация правила "EnhancedNetworkActivityAggregationRule"

Рассмотрим механику работы усовершенствованного правила агрегации "EnhancedNetworkActivityAggregationRule", фокусируясь на логике обработки и агрегации событий.

Правило "EnhancedNetworkActivityAggregationRule" предназначено для более глубокого анализа сетевых событий, используя расширенные критерии фильтрации и группировки.

Основные параметры правила:

  • filter: фильтрация событий основывается на категории "Network", уровне серьезности событий и определённых характеристиках, таких как соответствие user_agent шаблону регулярного выражения "Mozilla".

  • group_by: агрегация событий выполняется по полям src_ip, dest_ip, user_agent и http_method, что позволяет проводить многофакторный анализ сетевой активности.

  • starts_when: условие начала агрегации активируется при обнаружении события начала соединения или определённых типов HTTP-запросов.

  • merge_strategies: применяются стратегии агрегации sum для полей bytes_sent и bytes_received и longest_array для поля request_paths.

Логика агрегации:

  • При срабатывании условий filter и starts_when, события группируются по определённым IP-адресам источника и назначения, типам запросов и агентам пользователя.

  • Агрегированные события включают общее количество переданных и полученных байтов (bytes_sent и bytes_received) и наиболее часто встречающиеся пути запросов (request_paths).

Тестирование правила:

Тестовые сценарии "Enhanced Network Activity Test" включают события, соответствующие критериям фильтрации и группировки. Они направлены на проверку способности правила корректно агрегировать данные, учитывая новые параметры группировки и агрегации.

Предположим, что события, поступающие на агрегатор, на котором установлено правило агрегации, имеют следующий вид:

Example 3. Тестовое событие, JSON
{
  "category": "Network",
  "severity": 5,
  "event_type": "connection_start",
  "src_ip": "192.168.1.10",
  "dest_ip": "192.168.1.20",
  "user_agent": "Mozilla/5.0",
  "http_method": "POST",
  "bytes_sent": 2000,
  "bytes_received": 3000
}

Тогда, ожидаемый результат агрегации примет следующий вид:

Example 4. Агрегированное событие, JSON
{
  # служебные поля
  "id": "e1336694-b3d6-4e02-9f20-92f642bb64fa", # ID агрегированного события.
  "sourceIp": "192.168.1.10", # IP-адрес источника события.
  "tenantId": "99311ef1-32a1-4b2d-9d38-7613be5e4c2b", # ID тенанта.
  "art": "15.03.2024 15:25:05", # Дата и время получения первого события.
  "raw": "{\"category\":\"Network\",\"severity\":5,\"event_type\":\"connection_start\",\"src_ip\":\"192.168.1.10\",\"dest_ip\":\"192.168.1.20\",\"user_agent\":\"Mozilla/5.0\",\"http_method\":\"POST\",\"bytes_sent\":2000,\"bytes_received\":3000}", # Текст первого события.
  "collectorId": "891d9a49-8417-4ce9-abf4-184b0c093e46", # ID коллектора.
  "timestamp": "15.03.2024 15:45:10", # Дата и время создания агрегированного события.
  "aggregationRuleName": "example/network_activity_rule_enhanced", # Название правила агрегации.
  "aggregationRuleId": "EnhancedNetworkActivityAggregationRule", # ID правила агрегации.
  "type": "2", # Тип события - "агрегированное событие".
  "cnt": "5", # Количество событий в агрегации.
  "baseEventIds": "41627060-c610-494d-834c-0f6c3223f0eb, b5db4311-b978-45b5-8790-0b02834bf479, dd3d4ad4-9de9-478e-be65-24eb3e70abd8, 34a9661f-f460-4285-b11b-8d2388811819, 933a0aaa-85eb-477a-a1a4-285478a42f5a", # Список ID событий в агрегации.
  "baseEventTimestamps": "15.03.2024 15:25:05, 15.03.2024 15:28:35, 15.03.2024 15:33:59, 15.03.2024 15:37:02, 15.03.2024 15:41:17", # Список дат и времен получения событий в агрегации.
  "groupedBy": "src_ip, dest_ip, user_agent, http_method", # Поля группировки в агрегации.
  "correlationRuleName": "", # Название корреляционного правила.
  "correlationRuleId": "", # ID корреляционного правила.
  "correlationSeverity": "", # Уровень угрозы кореляционного события.

  # регулярные поля
  "category": "Network",
  "event_type": "connection_start",
  "src_ip": "192.168.1.10",
  "dest_ip": "192.168.1.20",
  "user_agent": "Mozilla/5.0",
  "http_method": "POST",
  "total_bytes_sent": 6000,
  "total_bytes_received": 8000,
  "request_paths": "/api/load"
}
Подробное описание служебных полей представлено в разделе Универсальная модель событий.

Таким образом создается многоуровневая логика агрегации для правила "EnhancedNetworkActivityAggregationRule", где события должны соответствовать заданным критериям фильтрации и группировки. Эти критерии включают не только IP-адреса источника и назначения, но также типы запросов и агенты пользователя, что позволяет проводить более детализированный анализ трафика.