Декларативные правила
В данном разделе представлен обзор структуры, принципа работы, процесса создания и расширения конфигурации декларативного правила корреляции R-Vision SIEM. Управление корреляционными правилами в системе осуществляется в разделе Экспертиза.
О декларативных правилах корреляции
Декларативное правило корреляции в системе R-Vision SIEM представляет собой структурированное описание желаемых корреляций между событиями безопасности, представленное в виде объекта данных в формате RObject (.ro). Это описание определяет условия корреляции на высоком уровне, фокусируясь на желаемом результате корреляции, а не на точной последовательности операций для его достижения.
Структура декларативного правила корреляции подразумевает определение псевдонимов (aliases
) для событий и их взаимосвязей. Псевдонимы позволяют группировать события в отдельные потоки и связывать их в логические цепочки (join
). Условия корреляции и агрегации задаются отдельно для каждой группы событий, что позволяет системе объединять их в единые корреляционные события.
Процесс корреляции состоит из последовательности операций сопоставления, где каждая операция связывает события на основе определенных критериев. Корреляция завершается, когда событие проходит через всю цепочку операций и соответствует всем условиям. Если задана группировка, событие может быть агрегировано с другими по установленным критериям. Дополнительно, корреляционное событие, можно обогатить используя правила, написанные на языке описания трансформаций и корреляций VRL
Структура правила корреляции
Правила корреляции содержат набор обязательных и опциональных полей.
Строки, начинающиеся с символа $ ($foo ), интерпретируются в VRL-блоках элементов экспертизы как переменные окружения. Чтобы избежать этого, экранируйте такие строки символом $ ($$foo ).
|
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Уникальный идентификатор правила корреляции. |
строка |
да |
|
Название правила. |
строка |
да |
|
Тип элемента. Всегда имеет значение |
строка |
да |
|
Версия правила корреляции, представленная в формате Semantic Versioning. |
строка |
да |
|
Имя и контактные данные автора правила корреляции. |
строка |
нет |
|
Описание правила корреляции, содержащее дополнительную информацию о его назначении и функциональности. |
строка |
нет |
|
Список источников данных, с которыми совместимо правило корреляции. Используется для классификации и быстрого поиска элементов экспертизы в системе. |
массив строк массив объектов |
нет |
|
Статус разработки текущей версии правила. Может принимать любые текстовые значения. Примеры значений:
|
строка |
нет |
|
Дата создания или изменения правила. Задается в формате Поле не связано с полями Дата создания и Дата изменения в карточке элемента экспертизы. |
строка |
нет |
|
Список ссылок на дополнительную информацию об атаках, которые пытается обнаружить правило. |
массив строк |
нет |
|
Список возможных ложноположительных срабатываний правила. Позволяет определить, было ли правило отработано ложно или нет. |
массив строк |
нет |
|
Уровень угрозы корреляционного события, испускаемого правилом. Допустимые значения:
|
строка |
да |
|
Список тегов, присвоенных правилу корреляции для классификации и быстрого поиска. |
массив строк |
нет |
|
Параметры для ограничения генерации корреляционных событий. При превышении ограничений система приостанавливает работу правила корреляции. |
объект |
нет |
|
Максимально допустимое количество корреляционных окон. При превышении ограничения система останавливает работу правила корреляции. |
число |
нет |
|
Блок правил на языке VRL, определяющий условия (фильтр), при которых правило корреляции будет применяться к событиям. |
строка |
нет |
|
Псевдонимы потоков событий, которые используются для их идентификации в рамках правила. |
объект |
да |
|
Определяет структуру корреляции, включая псевдонимы и условия в |
объект |
да |
|
Параметры для группировки результатов корреляции. |
массив |
нет |
|
Максимальное общее время хранения промежуточных корреляционных данных в секундах для всей цепочки событий. При отсутствии индивидуального значения |
число |
нет |
|
Содержит логику обогащения корреляционного события на языке VRL, которая выполняется после успешного соответствия корреляционным условиям. |
строка |
нет |
|
Определяет, должно ли создаваться оповещение по результатам корреляции событий. По умолчанию, если не указано, считается |
логический |
нет |
|
Настройка тестового окружения для правил, включая предварительную конфигурацию всех используемых в правиле внешних зависимостей, таких как активные списки. Это поле позволяет определить и инициализировать все необходимые ресурсы для выполнения тестов правила, включая MOCK данные для активных списков. |
объект |
нет (если нет внешних зависимостей) |
|
Указывает интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в поле |
число |
нет |
|
Блок тестирования правила корреляции, содержащий массив тестовых событий и условия для проверки срабатывания правила. |
объект |
нет |
Структура data_source
Поле data_source
, представленное массивом строк, имеет структуру:
data_source:
- <platform> # Платформа или информационная система
- <source> # Источник событий.
# События (EventId) или путь к журналу событий:
- <EventID_1>
- <EventID_2>
Поле data_source
, представленное массивом объектов, имеет следующую структуру:
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Платформа или информационная система. |
строка |
нет |
|
Источник событий. |
строка |
нет |
|
События (EventId) или путь к журналу событий. |
массив строк |
нет |
Структура rate_limit
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Максимально допустимое количество корреляционных событий, генерируемых за При превышении правило корреляции приостанавливает работу на |
число |
да |
|
Промежуток времени в секундах, в течение которого допустима генерация не более |
число |
да |
|
Промежуток времени в секундах, в течение которого ожидается восстановление нормального режима работы правила. При нормальном режиме за |
число |
да |
Структура aliases
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
Имя_псевдонима |
Уникальное имя, идентифицирующее поток событий. |
строка (ключ в объекте |
да |
|
Условие на языке VRL, которое определяет критерии фильтрации событий. |
строка |
да |
Структура select
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Определяет начальный псевдоним потока событий для корреляции. |
строка |
да |
|
Описывает последовательность операций соединения других потоков событий с исходным, указанным в |
объект |
да |
|
Условия, по которым происходит соединение потоков событий. |
массив |
да |
|
Указывает, должно ли отсутствие события из потока |
булево |
нет |
|
Время жизни для промежуточных результатов корреляции. Если не задано, используется значение из |
число |
нет (но обязательно, если |
Структура join
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Определяет последующие псевдонимы, которые присоединяются к корреляции. |
объект |
да |
|
Условия, по которым происходит соединение между псевдонимами потоков событий. |
массив |
да |
|
Указывает, должно ли отсутствие события из потока |
логический |
нет |
|
Время жизни для промежуточных результатов корреляции. Если не задано, используется значение из |
число |
нет (но обязательно, если указано |
Структура group
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Идентифицирует псевдоним потока событий, по атрибутам которого будет производиться группировка. |
строка |
да |
|
Поля, по которым будет производиться группировка. |
массив строк |
да |
|
Количество элементов в группе. |
число |
да |
|
Указывает, должна ли группировка выполняться по уникальным значениям полей, указанных в |
логический |
нет |
Структура tests
Поле | Описание | Тип данных | Обязательное поле |
---|---|---|---|
|
Название теста, позволяющее идентифицировать его в тестовой выдаче. |
строка |
да |
|
Поток тестовых событий, обрабатываемых правилом. |
массив |
да |
|
Блок правил на языке VRL, который будет вызываться на каждое событие, испускаемое полем |
строка |
да |
Операторы
В декларативных правилах R-Vision SIEM операторы используются для определения логики соединения и анализа данных между различными потоками событий. Они позволяют формулировать условия для сопоставления и группировки событий, основываясь на определенных атрибутах или характеристиках этих событий.
На данный момент в корреляционных правилах поддерживается только оператор eq .
|
Оператор eq
Оператор eq
используется для определения условия равенства между полями в различных потоках событий. Этот оператор является частью механизма join
, который позволяет связывать события на основе общих характеристик или атрибутов.
Синтаксис
Оператор eq
используется внутри блока join
и принимает следующий синтаксис:
join: alias: <имя_псевдонима> on: - eq: { <псевдоним1>: <поле1>, <псевдоним2>: <поле2> }
Здесь <имя_псевдонима>
— это псевдоним потока событий, к которому применяется оператор join
, <псевдоним1>
и <псевдоним2>
— это псевдонимы потоков событий, между которыми осуществляется сравнение, а <поле1>
и <поле2>
— это поля в этих потоках событий, которые сравниваются.
Пример:
В следующем примере демонстрируется использование оператора eq
для соединения двух потоков событий на основе равенства IP-адресов источника:
aliases: Traffic1: filter: !vrl | .event_type == "HTTP_REQUEST" Traffic2: filter: !vrl | .event_type == "FTP_REQUEST" select: alias: Traffic1 join: alias: Traffic2 on: - eq: { Traffic1: .source_ip, Traffic2: .source_ip }
В этом примере, события из Traffic1
и Traffic2
соединяются, когда поле .source_ip
совпадает между этими двумя потоками.
Составление правила
Управление корреляционными правилами в системе осуществляется в разделе Экспертиза. |
Для составления правила корреляции в системе R-Vision SIEM, необходимо заполнить обязательные поля правила. Это включает поля метаданных и поля, отвечающие за логику работы правила.
-
Определите метаданные правила:
-
id
: задайте уникальный идентификатор правила. -
name
: задайте название правила. -
version
: укажите версию правила, следуя стандартам Semantic Versioning. -
type
: определите тип правила какcorrelation_rule
. -
window_mode
: оставьте поле пустым, чтобы система ожидала декларативное правило. -
author
(опциональное): укажите имя и контактные данные автора. -
description
(опциональное): предоставьте краткое описание назначения и функциональности правила. -
data_source
(опциональное): укажите список совместимых с правилом источников данных для классификации правила в системе. -
status
(опционально): опишите статус разработки текущей версии правила. -
date
(опционально): укажите дату создания или изменения правила. -
reference
(опционально): предоставьте список ссылок на дополнительный материал об атаках, которые пытается обнаружить правило. -
known_false_positives
(опционально): укажите список возможных ложноположительных срабатываний правила. -
severity
: укажите уровень угрозы корреляционного события, испускаемого правилом (critical
,high
,medium
илиlow
). -
tags
(опциональное): добавьте теги для классификации правила.
-
-
Разработайте логику фильтрации и корреляции событий:
-
filter
: создайте условие для фильтрации входящих событий. События, которые не удовлетворяют условиям фильтра, исключаются из дальнейшего анализа и не учитываются в цепочке корреляционных операций. -
aliases
: создайте псевдонимы для потоков событий с соответствующими фильтрами. -
select
иjoin
: настройте логику соединения потоков событий, начиная с указанного псевдонима (alias
) и определяя условия соединения (on
). -
ttl
: определите общее максимальное время (ttl
) для всей цепочки корреляционных операций, чтобы ограничить продолжительность хранения промежуточных результатов в памяти.
-
-
Задайте логику группировки событий (опционально):
-
group
: определите параметры для группировки результатов корреляции, включая псевдоним (alias
), поля для группировки (by
) и количество элементов в группе (count
). -
throttle_time_sec
: определите интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в полеgroup
.
-
-
Опишите пост-корреляционные действия (опционально):
-
on_correlate
: определите логику обогащения корреляционного события, используя язык VRL. VRL-программа будет применяться к корреляционному событию после успешной корреляции.
-
-
Разработайте тесты для правила (опционально):
-
tests
: создайте тесты, включающие различные типы событий и условия, для проверки эффективности и точности правила корреляции.
-
Поля метаданных
Метаданные являются основой для идентификации и описания правила корреляции в системе. Они должны быть заполнены следующим образом:
-
id
: установите уникальный идентификатор правила. Этот идентификатор должен быть уникальным в рамках системы. -
name
: укажите название правила. -
version
: задайте версию правила, следуя принципам Semantic Versioning (например, 1.0.0). -
type
: установите тип правилаcorrelation_rule
. -
author
: укажите имя и контактные данные создателя правила. -
description
: предоставьте краткое описание назначения и функциональности правила. -
data_source
: укажите список совместимых с правилом источников данных для классификации правила в системе. -
status
: опишите статус разработки текущей версии правила. -
date
: укажите дату создания или изменения правила. -
reference
: предоставьте список ссылок на дополнительный материал об атаках, которые пытается обнаружить правило. -
known_false_positives
: укажите список возможных ложноположительных срабатываний правила. -
severity
: укажите уровень угрозы корреляционного события, испускаемого правилом (critical
,high
,medium
илиlow
). -
tags
: укажите теги для классификации правила в системе.
Поле rate_limit
Поле rate_limit
позволяет задать ограничения на генерацию корреляционных событий.
-
Укажите в поле
max_rate
максимально допустимое количество корреляционных событий, которое должно генерировать правило. -
Укажите в поле
duration_secs
период времени в секундах, в течение которого допускается генерация не болееmax_rate
корреляционных событий. -
Укажите в поле
grace_period_secs
период времени в секундах, в течение которого система будет ожидать восстановления нормальной работы правила.
Если за duration_secs
секунд будет сгенерировано более max_rate
событий, то система приостановит работу правила корреляции на grace_period_secs
секунд.
Ограничение работает только при отключенном режиме распределенной корреляции. |
Пример:
rate_limit: max_rate: 100 duration_secs: 60 grace_period_secs: 300
В этом примере устанавливается ограничение на генерацию не более 100 корреляционных событий за 60 секунд (1 минуту). При превышении заданного ограничения правило корреляции приостанавливает работу на 300 секунд (5 минут).
Поле max_correlation_windows
Поле max_correlation_windows
определяет максимально допустимое количество корреляционных окон. При превышении система останавливает работу правила корреляции. Для возобновления работы правила необходимо заново установить конфигурацию конвейера.
Ограничение работает только при отключенном режиме распределенной корреляции. |
Пример:
max_correlation_windows: 100
В этом примере устанавливается ограничение на открытие не более 100 корреляционных окон.
Поле filter
Поле filter
определяет подмножества событий, которые будут обработаны правилом. Это поле использует язык VRL для описания логики фильтрации, позволяя точно настроить, какие события будут участвовать в корреляции.
Предназначение поля filter
— оценить каждое входящее событие и определить, соответствует ли оно заданным критериям. Если событие не соответствует условиям фильтра, оно исключается из дальнейшего анализа и не учитывается в корреляционном анализе. Это позволяет снизить объем обрабатываемых данных и увеличить точность и эффективность корреляционных правил.
В VRL для написания условий фильтра используются логические операторы (&&
, ||
, !
), сравнения (==
, !=
, <
, >
, ⇐
, >=
) и другие функции языка для работы с данными (например, includes
, starts_with
). Это позволяет создавать сложные и гибкие условия фильтрации.
Пример:
filter: !vrl | .source_ip != null && # Событие должно иметь ненулевой IP-адрес источника .request_path != null && # Путь запроса также должен быть указан .response_code == 404 # Код ответа должен быть 404
В этом примере фильтр выбирает события, у которых есть IP-адрес источника и путь запроса, а также код ответа равен 404. Это может быть полезно для выявления потенциальных попыток несанкционированного доступа или сканирования уязвимостей на веб-сервере.
Поле aliases
Поле aliases
в правиле корреляции используется для разделения и фильтрации потока событий.
-
Определите псевдонимы. Назначьте каждому потоку событий уникальный псевдоним. Это имя будет использоваться для ссылки на этот поток в других частях правила.
Пример:
Для потока неудачных попыток входа можно использовать псевдоним
FailedLogins
. -
Сформулируйте фильтры для каждого псевдонима. Укажите, какие события будут относиться к данному потоку, основываясь на их атрибутах, таких как тип события, источник или содержание.
Пример фильтра: .event_type == "login_failure" && .source_ip != null
— этот фильтр отбирает все события неудачных попыток входа, где указан IP-адрес источника.
Пример:
aliases: HighRateTraffic: filter: !vrl | .event_type == "login_failure" && .source_ip != null SuspiciousPatterns: filter: !vrl | .event_type == "HTTP_REQUEST"
Поле select
Поле select
в правилах корреляции определяет логику соединения потоков событий. Это поле обеспечивает структурирование и организацию процесса корреляции, позволяя связывать события из разных источников на основе заданных критериев. Левый псевдоним в join
является основным источником событий, к которому добавляются события из правого псевдонима, удовлетворяющие условиям соединения. Если в правом псевдониме нет событий, соответствующих условиям соединения, то события из левого псевдонима все равно будут обработаны, но без добавления данных из правого псевдонима.
-
Выберите исходный псевдоним. Укажите в поле
alias
псевдоним, который будет использоваться как отправная точка для процесса корреляции. Этот псевдоним должен быть одним из определенных в разделеaliases
. -
Разработайте логику соединения (
join
). Используйте разделjoin
, чтобы определить, какие дополнительные потоки событий будут привлекаться для корреляции. В каждомjoin
указывается псевдоним другого потока событий, который будет соединяться с исходным. -
Определите условия соединения (
on
). В каждом блокеjoin
укажите условия соединения в разделеon
, которые определяют критерии соотношения событий из разных потоков. -
Используйте
absent
(опционально) для определения логики корреляции, основанной на отсутствии события. -
Укажите
wait_for
(опционально, но обязательно приabsent: true
) для установки времени ожидания для каждой операции соединения. Система будет накапливать события в течение указанного времени для каждогоjoin
и затем анализировать их на предмет соответствия заданным критериям соединения. Еслиabsent: true
, наряду с поступлением событий система будет регистрировать и их отсутствие.
Пример:
select: alias: TrafficA join: alias: TrafficB on: - eq: { TrafficA: .source_ip, TrafficB: .source_ip } wait_for: 80 alias: TrafficC absent: true on: - eq: { TrafficA: .source_ip, TrafficC: .source_ip } wait_for: 60
В этом примере, начиная с потока TrafficA
, сначала происходит соединение с TrafficB
по совпадению источников IP. Затем проверяется отсутствие событий из TrafficC
, соответствующих тому же условию, в течение 60 секунд.
Можно создать цепочку join , соединяя более двух потоков событий. Это позволяет формировать сложные условия корреляции для выявления более сложных угроз или аномалий.
|
Поле join
Поле join
в правиле корреляции отвечает за создание корреляционных отношений между потоками событий. Оно используется для объединения двух или более потоков событий на основе заданных критериев, включая временной интервал хранения промежуточных результатов корреляции (ttl
).
-
Структура
join
:-
Каждый
join
включает поляalias
,on
,absent
(опционально), иwait_for
(опционально). -
alias
вjoin
указывает псевдоним потока событий, который будет соединяться с начальным потоком, определенным вselect
. -
on
описывает условие соединения, обычно основываясь на совпадении значений определенных полей между событиями. -
absent
(опционально) определяет, должно ли отсутствие события из потокаalias
рассматриваться как условие для перехода к следующемуjoin
. Если указаноabsent: true
, то для этогоjoin
полеwait_for
является обязательным. -
wait_for
(опционально, еслиabsent: false
) устанавливает максимальное время хранения промежуточных результатов корреляции для конкретной операции соединения. Если для конкретногоjoin
не указано значениеwait_for
, применяется общее ограничение времени, заданное на уровне правила в полеttl
.wait_for
является обязательным полем, если указаноabsent: true
.
-
-
Условия соединения
on
:-
Условие
on
с операциейeq
(равенство) используется для указания полей, по которым будет происходить сопоставление между потоками событий.
-
-
Создание цепочек
join
:-
В
join
можно формировать цепочки, где каждый последующийjoin
добавляет новый поток событий к уже существующей группе. -
Это позволяет строить многоуровневые корреляции, где каждый новый уровень добавляет уточнения или расширяет условия корреляции.
-
Пример:
Для соединения двух потоков событий TrafficA
и TrafficB
, когда source_ip
в TrafficA
равен destination_ip
в TrafficB
, и одновременно проверяется отсутствие события TrafficC
в заданный временной интервал, используется следующая конструкция:
select: alias: TrafficA join: alias: TrafficB on: - eq: { TrafficA: .source_ip, TrafficB: .destination_ip } wait_for: 300 // Время жизни промежуточных результатов в секундах alias: TrafficC absent: true on: - eq: { TrafficA: .source_ip, TrafficC: .destination_ip } wait_for: 60 // Время ожидания отсутствия события в секундах
Поле group
Поле group
в правиле корреляции отвечает за группировку событий, что позволяет проводить более сложный и точный корреляционный анализ.
-
Определите псевдоним для группировки. В
alias
указывается псевдоним потока событий, для которого может быть произведена группировка. Этот псевдоним должен соответствовать одному из определенных в разделеaliases
. -
Выберите поля для группировки. Поле
by
определяет, по каким критериям (полям) будут группироваться события. Можно указать одно или несколько полей, на основе которых будут сформированы группы событий. -
Установите лимит количества событий в группе. Поле
count
определяет минимальное количество событий в каждой группе. Этот параметр позволяет контролировать, когда группа достигает достаточного размера для корреляции или инициирования действия. -
Определите признак уникальности группировки событий (опционально). В
unique
указывается, должна ли группировка выполняться по уникальным значениям полей, выбранных вby
:-
Если
unique: true
, то в группу будут собираться события с различными значениями полей. -
Если
unique: false
, то в группу могут собираться события с одинаковыми значениями полей.
-
Пример:
group: - alias: TrafficA by: - source_ip count: 10 - alias: TrafficB by: - suser count: 2 unique: true
В данном примере производится группировка событий из потоков TrafficA
и TrafficB
:
-
Для событий из потока
TrafficA
группировка производится по одинаковым значениям поляsource_ip
. -
Для событий из потока
TrafficB
группировка производится по уникальным значениям поляsuser
.
Корреляционное событие будет создано, когда в группе соберется 10 событий с одинаковым source_ip
из потока TrafficA
и 2 события с различными suser
из потока TrafficB
.
Можно задать несколько критериев в by , чтобы группировать события по более чем одному полю, что позволяет создавать более сложные условия корреляции.
|
Поле throttle_time_sec
Поле throttle_time_sec
указывает интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в поле group
.
Пример:
throttle_time_sec: 60
В этом примере интервал между корреляционными событиями составляет 60 секунд для каждой группы.
Поле ttl
Поле ttl
(time-to-live) определяет общее максимальное время в секундах, в течение которого промежуточные результаты всех корреляционных операций в правиле сохраняются в памяти, прежде чем будут удалены. Это значение применяется ко всем операциям корреляции в правиле. В отдельных join
блоках может применяться поле wait_for
для определения времени хранения промежуточных результатов корреляции конкретного join
блока. При его отсутствии применяется общее значение ttl
на уровне правила.
Если в правиле не используются операции соединения (join ) и группировки (group ), поле ttl является необязательным.
|
Пример:
select: alias: TrafficA join: alias: TrafficB on: - eq: { TrafficA: .source_ip, TrafficB: .destination_ip } ttl: 3600 // Промежуточные результаты для всей цепочки событий этого `join` будут храниться в течение 1 часа
Поле on_correlate
Поле on_correlate
в декларативных правилах корреляции используется для определения действий обогащения, которые выполняются после успешной корреляции событий. Эти действия могут включать преобразование данных, добавление новой информации к событиям или выполнение вычислений на основе данных событий. Для определения этих действий используется язык VRL. Помимо этого, это может включать извлечение информации из активных списков и использоваться для взаимодействия с таблицами обогащения.
Пример заполнения поля on_correlate
:
on_correlate: !vrl | .message = "Обнаружена аномалия доступа с источника: " + .source_ip .threat_level = match .event_count { >= 1000 => "Критический", >= 500 => "Высокий", _ => "Средний" } .analysis_summary = "Источник " + .source_ip + " сгенерировал " + tostring(.event_count) + " событий, что указывает на потенциальную угрозу."
В этом примере:
-
Присваивается новое сообщение событию с детальным описанием обнаруженной аномалии.
-
Устанавливается уровень угрозы на основе количества событий, связанных с источником.
-
Формируется сводка анализа, включающая IP-адрес источника и общее количество событий.
Поле trigger_alert
Поле trigger_alert
в корреляционном правиле отвечает за решение о создании оповещений, основанных на результатах корреляции событий. Это поле определяет, должны ли по результатам корреляции создаваться оповещения в системе.
Поле trigger_alert
может принимать значения true
или false
. Когда trigger_alert
установлено в true
, оповещение будет создаваться при каждом успешном случае корреляции, соответствующем условиям правила. Если trigger_alert
установлено в false
, оповещения создаваться не будут, даже если корреляция происходит. В случае, если поле trigger_alert
не указано в правиле, его значение по умолчанию считается true
, что подразумевает создание оповещения по умолчанию. Изменение значения trigger_alert
в существующем правиле влияет на обработку будущих событий, но не затрагивает уже созданные оповещения.
Пример:
# Логика обогащения корреляционного события на языке VRL. on_correlate: !vrl | .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload" # Управляет созданием оповещений по результатам корреляции. trigger_alert: true # Тесты для проверки работы правила. tests: ...
Тестирование правила
Тесты в правиле корреляции позволяют проверить корректность работы правила и его способность реагировать на заданные типы событий.
Тестирование правила корреляции гарантирует, что правило корректно обрабатывает данные, точно идентифицирует события и правильно реагирует на них, обеспечивая надежную работу системы.
В случае, если правило корреляции использует данные из активных списков и/или таблиц обогащения, необходимо инициализировать MOCK данные через поле setup_tests
. Это позволит тестировать правило в условиях, максимально приближенных к реальной эксплуатации, без необходимости доступа к продуктивным данным.
Поле setup_tests
Для тестирования правил, которые используют внешние зависимости (например, взаимодействуют с активными списками и/или таблицами обогащения), использование поля setup_tests является обязательным.
|
Поле setup_tests
в конфигурации правила корреляции предназначено для определения тестового окружения, включая предварительную настройку MOCK данных для активных списков и таблиц обогащения, используемых в правиле. Это обеспечивает проверку правила на корректность выполнения и взаимодействие с внешними данными до его применения в операционной среде.
setup_tests
используется для:
-
Имитации взаимодействия правила с активными списками и таблицами обогащения.
-
Проверки логики правила с учетом ожидаемых данных обогащения.
Процедура настройки setup_tests
:
-
В корне конфигурации правила указывается секция
setup_tests
. -
Внутри
setup_tests
через ключenrichment_tables
илиactive_lists
задаются MOCK таблицы обогащения и активные списки, используемые правилом. -
Для каждого MOCK элемента указывается его структура и набор MOCK данных, которые будут использоваться при тестировании.
-
MOCK данные должны соответствовать ожидаемым данным из реальных таблиц обогащения/активных списков и позволять полноценно проверить работу правила.
setup_tests
для активного списка:setup_tests: active_lists: <имя_списка>: <поле>: - [<значение_поля1>] - [<значение_поля2>]
setup_tests
для таблицы обогащения:setup_tests: enrichment_tables: <имя_таблицы>: fields: - <поле1> - <поле2> data: - [<значение1_поля1>, <значение1_поля2>] - [<значение2_поля1>, <значение2_поля2>]
Поле tests
Поле tests
включает набор тестов для проверки и верификации правила корреляции. Эти тесты помогают убедиться, что правило работает корректно и соответствует ожидаемым результатам.
Чтобы создать тестовый сценарий, в поле tests
определите следующие параметры:
-
В
name
укажите название теста. -
В
events
создайте массив событий, которые будут использоваться для тестирования. Эти события должны имитировать поток реальных исходных событий, которые поступают на коррелятор. -
В
assertion
установите условие или набор условий, которые должны быть выполнены в результате обработки тестовых событий правилом. Это может включать проверку конкретных полей в итоговых агрегированных событиях или подсчет количества таких событий.
Тестовые события
Тестовые события должны имитировать данные, которые в реальности поступают на коррелятор. Они должны соответствовать формату и структуре событий, обрабатываемых правилом корреляции и содержать поля, которые используются в правиле.
Если в правиле настроена корреляция по полю с типом В противном случае при запуске тестов правила система обработает поле в виде строки и корреляция по дате-времени будет работать некорректно. |
Условия проверки
Условия должны точно отражать ожидаемый результат применения правила к тестовым событиям. Это может быть проверка наличия определенных полей в событии, соответствие их значений ожидаемым или сравнение количества событий с предполагаемым исходом. Для более сложных правил могут потребоваться более сложные утверждения, которые могут включать логические операции и сравнения.
Пример:
tests: - name: "Тестирование обнаружения DDoS атаки" events: - { source_ip: "192.168.1.1", event_type: "traffic_anomaly", traffic_volume: "10000" } assertion: !vrl | assert_eq!(.alert_type, "DDoS", "Ожидается обнаружение DDoS атаки")
Обогащение через внешние зависимости
Обогащение корреляционных событий представляет собой процесс дополнения событий, сгенерированных в ходе корреляции, дополнительными данными. В декларативных правилах корреляции можно настроить логику обогащения в поле on_correlate.
В рамках языка VRL обогащение корреляционных событий включает в себя применение специфических функций и выражений для динамического добавления или модификации данных в событиях. Помимо этого, это может включать извлечение информации из активных списков, которые могут предоставлять как статическую, так и динамическую информацию в зависимости от настроек схемы активного списка. А также взаимодействие с таблицами обогащения, которые предоставляют статическую информацию, например, сведения о геолокации, данные организаций, соответствующие спискам разрешенных и заблокированных ресурсов.
Активные списки
Активные списки представляют собой структуры данных, которые используются для хранения и управления как динамически изменяющимися наборами информации так и статическими списками и таблицами, такими как IP-адреса, доменные имена, идентификаторы устройств или пользователей. В контексте декларативных правил корреляции, поле on_correlate может включать вызовы функций для взаимодействия с активными списками, используя язык VRL. Это позволяет добавлять, извлекать и удалять записи из активных списков на основе логики обработки корреляционных событий.
Функции для взаимодействия с активными списками:
Функция | Описание | Параметры и Результат |
---|---|---|
|
Добавляет запись в активный список. Если запись с таким ключом уже существует, значение обновляется. В противном случае создается новая запись. |
|
|
Извлекает значение из активного списка по указанному ключу. Возвращает значение, ассоциированное с ключом, или |
|
|
Удаляет запись из активного списка. Если запись с таким ключом отсутствует, ничего не происходит. |
|
on_correlation
правила корреляции:on_correlation: !vrl | if .threat_level == "High" { add_active_record("suspicious_ips", .source_ip, true) } if .event_type == "authentication_failure" { let failure_count = get_active_record("auth_failure_counts", .user_id) || 0 add_active_record("auth_failure_counts", .user_id, failure_count + 1) } if .event_type == "authentication_success" { remove_active_record("auth_failure_counts", .user_id) }
В этом примере:
-
IP-адреса с высоким уровнем угрозы добавляются в активный список подозрительных IP-адресов.
-
Считается количество неудачных попыток аутентификации для пользователей и обновляется соответствующая запись в активном списке.
-
При успешной аутентификации запись пользователя удаляется из списка неудачных попыток аутентификации, что может свидетельствовать об окончании атаки или восстановлении доступа пользователя.
Таблицы обогащения
Таблицы обогащения представляют собой структурированные наборы данных, используемые для добавления контекстной информации к событиям в процессе их корреляции. Это может включать данные о геолокации, информацию об устройствах, базы данных угроз и другие внешние источники данных. В поле on_correlate
декларативных правил корреляции можно использовать специализированные функции VRL для запроса и извлечения данных из таблиц обогащения, тем самым обогащая события дополнительными деталями для более глубокого анализа.
Функции для взаимодействия с таблицами обогащения:
-
find_enrichment_table_records(table, condition, [select, case_sensitive])
: позволяет выполнить поиск записей в таблице обогащения, которые соответствуют заданным условиям. Возвращает массив записей, удовлетворяющих условиям поиска. -
get_enrichment_table_record(table, condition, [select, case_sensitive])
: используется для извлечения одной записи из таблицы обогащения, которая точно соответствует заданному условию. В случае, если запись не найдена или найдено более одной записи, функция возвращает ошибку.
Пример использования в поле on_correlate
:
on_correlate: !vrl | let geo_info = find_enrichment_table_records("geoip", {"ip": .source_ip}, select: ["country_name", "city_name"]) if !geo_info.is_empty() { .source_country = geo_info[0].country_name .source_city = geo_info[0].city_name } let threat_info = get_enrichment_table_record("threat_intelligence", {"ip": .source_ip}, select: ["category", "score"]) if !is_null(threat_info) { .threat_category = threat_info.category .threat_score = threat_info.score }
В этом примере:
-
Выполняется поиск геолокационной информации по IP-адресу источника события в таблице
geoip
, и результаты добавляются к событию как.source_country
и.source_city
. -
Затем производится запрос к таблице
threat_intelligence
для извлечения информации о категории угрозы и оценке уровня угрозы, ассоциированных с IP-адресом источника. Эти данные добавляются к событию как.threat_category
и.threat_score
.
Тестирование правила с внешними зависимостями
Тестирование правил корреляции, которые обращаются к таблицам обогащения и/или активным спискам, требует предварительной настройки тестового окружения с помощью поля setup_tests
, которое позволяет определить MOCK данные для внешнего элемента.
Поле setup_tests
позволяет инициализировать необходимые данные для таблиц обогащения и активных списков перед запуском тестов правила корреляции. Это обеспечивает возможность проверить не только синтаксическую корректность правила, но и его логику обработки данных при взаимодействии с внешними элементами.
Процесс тестирования
-
Определение MOCK данных: для начала тестирования необходимо определить MOCK данные для всех активных списков и таблиц обогащения, с которым взаимодействует тестируемое правило. Эти данные должны быть репрезентативными и охватывать различные сценарии использования списка в правиле.
-
Конфигурация
setup_tests
: в корне конфигурации правила корреляции необходимо добавить секциюsetup_tests
, где указываются все используемые таблицы обогащения и активные списки с их MOCK данными. Имя каждого внешнего элемента в секции должно точно совпадать с именем, используемым в теле правила. -
Заполнение MOCK данных: внутри внешнего элемента в секции
setup_tests
указывается список полей и соответствующих им MOCK данных. Данные задаются в формате, соответствующем ожидаемым запросам к активному списку и/или таблице обогащения в рамках теста.
Примеры:
setup_tests: active_lists: suspicious_ips: - source_ip: "192.168.1.1" is_suspicious: true - source_ip: "10.10.10.10" is_suspicious: false auth_failure_counts: - user_id: "user1" failure_count: 1 - user_id: "user2" failure_count: 2
enrichment_tables: geoip: fields: - ip - country - city data: - ["8.8.8.8", "US", "Mountain View"] - ["9.9.9.9", "GB", "London"]
Создание и выполнение тестов
После настройки MOCK данных следующим шагом является создание набора тестов, которые будут проверять взаимодействие правила корреляции с активными списками. Каждый тест должен включать в себя:
-
Описание теста (
name
): краткое описание цели теста. -
События (
events
): список событий, которые будут подаваться на вход правила в процессе тестирования. -
Утверждения (
assertions
): логика проверки результата работы правила на основе предоставленных событий и MOCK данных активных списков.
Пример теста:
tests: - name: "Test for Suspicious IP Detection" events: - { source_ip: "192.168.1.1", event_type: "login_attempt" } assertion: !vrl | let is_suspicious = get_active_record("suspicious_ips", .source_ip); assert_eq!(is_suspicious, true, "IP should be marked as suspicious") - name: "Authentication Failure Count Test" events: - { user_id: "user2", event_type: "login_failure" } assertion: !vrl | let failure_count = get_active_record("auth_failure_counts", .user_id); assert_eq!(failure_count, 2, "User2 should have 2 authentication failures")
В разделе событий (events
) указываются события, которые подаются на вход правила корреляции для проверки его срабатывания. В первом тесте это попытка входа с IP-адреса, который должен быть помечен как подозрительный. Во втором тесте — неудачная попытка входа от пользователя, для которого уже зафиксированы подобные инциденты.
Утверждения (assertions
) содержат логику проверки, выполняемую после обработки событий правилом. В первом тесте проверяется, что IP-адрес из события корректно идентифицирован как подозрительный на основе данных активного списка suspicious_ips
. Во втором — что количество неудачных попыток входа для пользователя соответствует ожидаемому значению, основанному на данных активного списка auth_failure_counts
.
Если утверждение (assertion
) выполнено успешно, это означает, что правило корреляции корректно обрабатывает входные данные и правильно взаимодействует с активными списками. В противном случае, если результат не соответствует ожидаемому, это указывает на проблемы в логике правила или в его взаимодействии с активными списками.
|
Пример:
Пример правила корреляции с активным списком, схема RObject
# Уникальный идентификатор правила. id: example/ddos_detection_rule # Название правила. name: DDoSDetectionRule # Тип элемента экспертизы. type: correlation_rule # Версия правила. version: 1.0.0 # Уровень угрозы, присваиваемый корреляционному правилу. severity: high # Описание назначения правила. description: Обнаружение атаки DDoS на основе трафика с учетом репутации IP-адресов из активного списка DDoSAttackIPs. # Имя и контактные данные автора правила. author: John Doe <johndoe@example.com> # Список совместимых с правилом источников данных. data_source: - platform: Windows - source: Sysmon - events: - EventID_13 - EventID_1 # Теги, связанные с правилом. tags: - ddos - security # Фильтр применяется к событиям перед входом в правило. filter: !vrl | .source_ip != null && .request_path != null # Разделение потоков событий на псевдонимы. aliases: HighRateTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100 SuspiciousPatterns: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload") NormalTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home") # Соединение потоков событий. select: alias: HighRateTraffic join: alias: SuspiciousPatterns absent: true on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip } wait_for: 30 # Группировка событий. group: - alias: HighRateTraffic by: - source_ip count: 3 # Частота генерации корреляционных событий в рамках каждой группы (событий в секунду). throttle_time_sec: 60 # Время хранения промежуточных корреляционных данных (секунды). ttl: 60 # Логика обогащения корреляционного события на языке VRL. on_correlate: !vrl | let attack_info = get_active_record("DDoSAttackIPs", .source_ip); if attack_info != null { .attack_type = attack_info.attack_type .last_detected = attack_info.last_detected .attack_volume = attack_info.attack_volume .reputation_score = attack_info.reputation_score .involved_attacks = attack_info.involved_attacks .notes = attack_info.notes } .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) + ", тип атаки: " + to_string!(.attack_type) + ", объем атаки: " + to_string!(.attack_volume) + ", репутация IP: " + to_string!(.reputation_score) + ", количество атак: " + to_string!(.involved_attacks) + ", заметки: " + to_string!(.notes) if .reputation_score != null && .reputation_score < 50 { .ddos_alert = "Высокий уровень запросов и низкая репутация IP, потенциальная DDoS атака" # Управляет созданием оповещений по результатам корреляции. trigger_alert: true # MOCK данные для активного списка в тестовом окружении. setup_tests: active_lists: DDoSAttackIPs: - ip: "192.0.2.1" last_detected: "2023-01-01T12:00:00Z" attack_type: "SYN Flood" reputation_score: 10 attack_volume: "1.5 Gbps" involved_attacks: 5 notes: "Repeated attacks on multiple targets" - ip: "192.0.2.2" last_detected: "2023-01-02T15:30:00Z" attack_type: "UDP Flood" reputation_score: 20 attack_volume: "500 Mbps" involved_attacks: 2 notes: "High packet rate observed" - ip: "192.0.2.3" last_detected: "2023-01-03T18:45:00Z" attack_type: "Amplification" reputation_score: 15 attack_volume: "2 Gbps" involved_attacks: 3 notes: "DNS amplification attack" - ip: "192.0.2.4" last_detected: "2023-01-04T20:00:00Z" attack_type: "HTTP Flood" reputation_score: 5 attack_volume: "750 Mbps" involved_attacks: 4 notes: "Targeted at web services" # Тесты для проверки работы правила. tests: - name: HighRateTraffic from Malware IP events: - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" } assertion: !vrl | assert_eq!(.ddos_alert, "Высокий уровень запросов и низкая репутация IP, потенциальная DDoS атака", "Alert generation failed for malware IP") - name: HighRateTraffic from Clean IP events: - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "100s", "request_path": "/api/highload" } assertion: !vrl | assert!(.ddos_alert == null || .ddos_alert == "", "Alert should not be generated for clean IP")
Пример правила корреляции
# Уникальный идентификатор правила. id: example/ddos_detection_rule # Название правила. name: DDoSDetectionRule # Тип элемента экспертизы. type: correlation_rule # Версия правила. version: 1.0.0 # Уровень угрозы, присваиваемый корреляционному правилу. severity: high # Описание назначения правила. description: Обнаружение атаки DDoS на основе трафика # Имя и контактные данные автора правила. author: John Doe <johndoe@example.com> # Список совместимых с правилом источников данных. data_source: - platform: Windows - source: Sysmon - events: - EventID_13 - EventID_1 # Теги, связанные с правилом. tags: - ddos - security # Фильтр применяется к событиям перед входом в правило. filter: !vrl | .source_ip != null && .request_path != null # Разделение потоков событий на псевдонимы. aliases: HighRateTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100 SuspiciousPatterns: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload") NormalTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home") # Соединение потоков событий. select: alias: HighRateTraffic join: alias: SuspiciousPatterns absent: true on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip } wait_for: 30 # Группировка событий. group: - alias: HighRateTraffic by: - source_ip count: 3 # Частота генерации корреляционных событий в рамках каждой группы (событий в секунду). throttle_time_sec: 60 # Время хранения промежуточных корреляционных данных (секунды). ttl: 60 # Логика обогащения корреляционного события на языке VRL. on_correlate: !vrl | .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload" # Управляет созданием оповещений по результатам корреляции. trigger_alert: true # Тесты для проверки работы правила. tests: - name: DDoS Attack Detected events: - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.100", "window": "130s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s", "request_path": "/home" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s", "request_path": "/home" } = - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.224", "window": "60s", "request_path": "/contact" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about" } assertion: !vrl | assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1")
Интерпретация
Правило корреляции "DDoSDetectionRule" предназначено для выявления DDoS-атак на основе анализа потока поступающих событий.
Метаданные правила:
-
id
: уникальный идентификатор правила в системе. -
name
: название правила. -
type
: типcorrelation_rule
указывает на то, что это правило корреляции. -
version
: версия правила. -
description
: описывает цель правила — "Обнаружение атаки DDoS на основе трафика". -
author
: создатель правила. -
tags
: тегиddos
иsecurity
классифицируют правило по его функциональности и области применения.
Логика фильтрации (filter):
-
Условия фильтрации
.source_ip != null
и.request_path != null
выбирают события, у которых есть IP-адрес источника и путь запроса.
Логика фильтрации (aliases):
-
Псевдоним
HighRateTraffic
: фильтр для событий с высокой частотой запросов, идентифицируемых поevent_type == "HTTP_REQUEST"
иparse_duration!(.window) >= 100
. Отбирает события с частотой запросов, превышающей или равной 100 запросов в определенный временной интервал. -
Псевдоним
SuspiciousPatterns
: фильтр для событий с подозрительными путями запроса, определяемый какevent_type == "HTTP_REQUEST"
иends_with(to_string!(.request_path), "/api/highload")
. Отбирает запросы, пути которых оканчиваются на "/api/highload".
Соединение потоков событий (select и join):
-
Выбор и соединение (
select
иjoin
) потоков событийHighRateTraffic
иSuspiciousPatterns
осуществляется на основе совпаденияsource_ip
. Это соединение позволяет коррелировать высокочастотные запросы с подозрительными паттернами, исходящими от одного источника. -
В поле
wait_for
определяется, что система должна ожидать в течение 30 секунд перед тем, как принять решение о соединении событий из различных псевдонимов. Это значит, что система будет накапливать события в течение указанного времени с момента получения первого события и затем анализировать их на предмет соответствия заданным критериям соединения. -
В операции соединения с
SuspiciousPatterns
используется полеabsent: true
, что указывает на то, что отсутствие события изSuspiciousPatterns
в течение 30 секунд будет добавлено в цепочку корреляционных событий.
Группировка событий (group):
-
group
: группировка событий применяется к псевдонимуHighRateTraffic
по полюsource_ip
, что позволяет агрегировать и анализировать события, исходящие от одного источника IP. Количество событий для формирования группы в полеcount
установлено в 10, что означает, что группа будет сформирована при накоплении 3 событий от одного источника IP. -
throttle_time_sec
: задает интервал времени в секундах между генерацией корреляционных событий в рамках каждой группы событий. Например, 60 секунд.
Срок хранения промежуточных данных корреляции (ttl):
-
ttl
на уровне правила задает общее максимальное время жизни промежуточных корреляционных данных. Например, 60 секунд. Это время определяет, как долго промежуточные результаты для всей цепочки событий будут храниться в памяти до их автоматического удаления, если корреляция не завершается.
Пост-корреляционная логика (on_correlate):
-
on_correlate
: обогащает корреляционное событие полями.message
и.additional_info
. -
trigger_alert
: управляет созданием оповещений по результатам корреляции. В данном случае, сtrigger_alert: true
, система будет генерировать оповещения при успешной корреляции событий.
По накоплении 3 событий правило генерирует корреляционное событие со следующими атрибутами:
-
type
: "correlation_rule", -
event_type
: "DDoS Detection", -
source_ip
: "192.168.1.1", -
datetime
: "2023-09-28T12:45:00Z", -
message
: "Обнаружена потенциальная DDoS атака с источника: 192.168.1.1", -
additional_info
: "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"
Тестирование правила (tests):
-
Набор тестовых сценариев "DDoS Attack Detected" включает набор событий, часть из которых имитируют характеристики DDoS-атаки, а остальные являются контрольными примерами обычного трафика. Цель тестирования — подтвердить, что правило корректно идентифицирует только события, связанные с DDoS-атаками и игнорирует неподходящие события. Тесты включают события с HTTP-запросами, различающимися по
source_ip
иrequest_path
, что позволяет проверить способность правила различать атакующий трафик от легитимного. -
События, моделирующие DDoS-атаку, имеют одинаковый
source_ip
и конечный путь запроса "/api/highload", что соответствует фильтрамHighRateTraffic
иSuspiciousPatterns
. Это сценарий, в котором ожидается активация правила и генерация корреляционного события. -
Контрольные события с разными
source_ip
и неподозрительными путями запроса ("/home", "/contact", "/about") предназначены для подтверждения, что правило не реагирует на обычные запросы, не связанные с DDoS-активностью.
assertion
в тестовом блоке предназначен для проверки того, что только события, соответствующие характеристикам DDoS-атаки, приводят к формированию корреляционного события с ожидаемым сообщением об обнаружении атаки.
Обработка событий правилом корреляции
Проанализируем логику работы правила "DDoSDetectionRule", фокусируясь на процессе обработки событий.
Предположим, что за короткий промежуток времени на коррелятор, на котором установлено правило "DDoSDetectionRule", поступает поток JSON-событий от различных источников IP, включая множество запросов с IP-адреса "192.168.1.1". Эти события соответствуют критериям, заданным в фильтрах HighRateTraffic
и SuspiciousPatterns
.
{ "event_type": "HTTP_REQUEST", "source_ip": "192.168.1.x", // где x - уникальный номер для каждого события "datetime": "2023-09-28T12:34:56Z", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/93.0.4472.124 Safari/537.36" }
По мере поступления этих событий, они фильтруются правилом "DDoSDetectionRule". Фильтр HighRateTraffic
отбирает события, соответствующие запросам с высокой частотой (более 100 запросов за заданный временной интервал), а SuspiciousPatterns
фокусируется на запросах с определенным путем запроса /api/highload
.
Когда события от одного источника IP, например, "192.168.1.1", соответствуют обоим фильтрам и проходят через механизм соединения join
, правило "DDoSDetectionRule" приступает к группировке. Правило использует поле group
для агрегирования событий от одного источника IP. Если количество событий от одного источника соответствует заданному порогу в 3 события, правило генерирует корреляционное событие.
{ "type": "correlation_rule", "severity": "high", "event_type": "DDoS Detection", "source_ip": "192.168.1.1", "datetime": "2023-09-28T12:45:00Z", "message": "Обнаружена потенциальная DDoS атака с источника: 192.168.1.1", "additional_info": "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload" }
Данное корреляционное событие указывает на то, что система обнаружила аномальную активность, соответствующую критериям DDoS-атаки, исходящей с одного IP-адреса. Правило "DDoSDetectionRule" коррелирует события, основываясь на двух ключевых факторах: частоте запросов и специфических путях запросов, что является индикатором потенциальной атаки.
Расширение конфигурации существующего правила
Рассмотрим сценарий, в котором необходимо добавить дополнительные условия фильтрации и корреляции для правила "DDoSDetectionRule", чтобы повысить его эффективность и надежность в обнаружении более сложных DDoS-атак.
Исходное правило корреляции "DDoSDetectionRule", схема RObject
# Уникальный идентификатор правила. id: example/ddos_detection_rule # Название правила. name: DDoSDetectionRule # Тип элемента экспертизы. type: correlation_rule # Версия правила. version: 1.0.0 # Уровень угрозы, присваиваемый корреляционному правилу. severity: high # Описание назначения правила. description: Обнаружение атаки DDoS на основе трафика # Имя и контактные данные автора правила. author: John Doe <johndoe@example.com> # Список совместимых с правилом источников данных. data_source: - platform: Windows - source: Sysmon - events: - EventID_13 - EventID_1 # Теги, связанные с правилом. tags: - ddos - security # Фильтр применяется к событиям перед входом в правило. filter: !vrl | .source_ip != null && .request_path != null # Разделение потоков событий на псевдонимы. aliases: HighRateTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100 SuspiciousPatterns: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload") NormalTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home") # Соединение потоков событий. select: alias: HighRateTraffic join: alias: SuspiciousPatterns absent: true on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip } wait_for: 30 # Группировка событий. group: - alias: HighRateTraffic by: - source_ip count: 3 # Частота генерации корреляционных событий в рамках каждой группы (событий в секунду). throttle_time_sec: 60 # Время хранения промежуточных корреляционных данных (секунды). ttl: 60 # Логика обогащения корреляционного события на языке VRL. on_correlate: !vrl | .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload" # Управляет созданием оповещений по результатам корреляции. trigger_alert: true # Тесты для проверки работы правила. tests: - name: DDoS Attack Detected events: - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.100", "window": "130s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s", "request_path": "/home" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s", "request_path": "/home" } = - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.224", "window": "60s", "request_path": "/contact" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about" } assertion: !vrl | assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1")
Для этого добавим дополнительный псевдоним с фильтрацией для отбора событий по значению поля user_agent
исходного события. Также добавим обращение к активному списку для динамической оценки уровня угрозы, основанной на текущей информации о подозрительных IP-адресах.
# Уникальный идентификатор правила. id: example/ddos_detection_rule # Название правила. name: EnhancedDDoSDetectionRule # Тип элемента экспертизы. type: correlation_rule # Версия правила. version: 1.0.0 # Уровень угрозы, присваиваемый корреляционному правилу. severity: high # Описание назначения правила. description: Расширенное обнаружение DDoS-атак на основе трафика # Имя и контактные данные автора правила. author: John Doe <johndoe@example.com> # Список совместимых с правилом источников данных. data_source: - platform: Windows - source: Sysmon - events: - EventID_13 - EventID_1 # Теги, связанные с правилом. tags: - ddos - security - enhanced_detection # Фильтр применяется к событиям перед входом в правило. filter: !vrl | .source_ip != null && .request_path != null # Разделение потоков событий на псевдонимы. aliases: HighRateTraffic: filter: !vrl | .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100 SuspiciousPatterns: filter: !vrl | .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload") # Дополнительный псевдоним. UnusualUserAgents: filter: !vrl | .event_type == "HTTP_REQUEST" && regex_match("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", .user_agent) # Соединение потоков событий. select: alias: HighRateTraffic join: alias: SuspiciousPatterns absent: true on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip } join: alias: UnusualUserAgents on: - eq: { HighRateTraffic: .user_agent, UnusualUserAgents: .user_agent } wait_for: 40 # Группировка событий. group: - alias: HighRateTraffic by: - source_ip - user_agent # Дополнительный критерий фильтрации. count: 3 # Частота генерации корреляционных событий в рамках каждой группы (событий в секунду). throttle_time_sec: 60 # Время жизни промежуточных корреляционных данных (секунды). ttl: 80 # Логика обогащения корреляционного события на языке VRL. on_correlate: !vrl | .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload и необычные user agents" # Обращение к активному списку "suspicious_ips" if get_active_record("suspicious_ips", .source_ip) != null { .threat_level = "high" } else { .threat_level = "medium" } # Управляет созданием оповещений по результатам корреляции. trigger_alert: true # MOCK данные активного списка setup_tests: active_lists: suspicious_ips: - "192.0.2.1" - "198.51.100.10" # Тесты для проверки работы правила. tests: - name: Enhanced DDoS Attack Detection events: - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "130s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s", "request_path": "/home" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s", "request_path": "/home" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "60s", "request_path": "/contact", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "130s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "140s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "198.51.100.10", "window": "50s", "request_path": "/home", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } - { "event_type": "HTTP_REQUEST", "source_ip": "203.0.113.2", "window": "60s", "request_path": "/contact" } - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } assertion: !vrl | assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1") assert_eq!(.threat_level, "high")
Таким образом, в улучшенное правило "EnhancedDDoSDetectionRule" внесены следующие изменения:
-
Дополнительный псевдоним и условие фильтрации:
-
Добавлен псевдоним
UnusualUserAgents
, в рамках которого происходит фильтрация событий поuser_agent
для идентификации необычных или подозрительных запросов. -
Фильтр
.event_type == "HTTP_REQUEST" && regex_match("^Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", .user_agent)
отбирает события со значением "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в полеuser_agent
. -
Добавлен
UnusualUserAgents
вselect
с последующей корреляцией поuser_agent
.
-
-
Настройка
ttl
:-
Значение
ttl
установлено в 80 секунд, увеличивая период хранения промежуточных корреляционных данных.
-
-
Дополнительный критерий группировки:
-
Добавлена группировка по
user_agent
, для более точного определения распределенных атак.
-
-
Интеграция с активным списком:
-
Использована функция
get_active_record("suspicious_ips", .event.source_ip)
для обогащения результатов проверки данными из активного списка "suspicious_ips". Это реализовано через условие:if get_active_record("suspicious_ips", .source_ip) != null { .threat_level = "high" } else { .threat_level = "medium" }
Таким образом, при генерации корреляционного события, помимо основного сообщения и дополнительной информации, теперь также устанавливается уровень угрозы в зависимости от статуса IP-адреса в активных списках. Уровень угрозы устанавливается в "high", если IP-адрес источника обнаружен в списке подозрительных, и "medium" в остальных случаях.
-
-
Тестовая проверка:
-
Добавлен блок
setup_tests
для предварительной настройки тестового окружения, включающий активный списокsuspicious_ips
с указанными IP-адресами "192.168.1.1" и "10.10.10.10". Это позволяет эмулировать обращение к активному списку во время тестирования правила и проверять логику обогащения событий на основе наличия IP-адреса в списке подозрительных. -
Добавлены тестовые события со значением "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в поле
user_agent
, чтобы проверить правило на способность корректно идентифицировать и обрабатывать события с конкретным значением вuser_agent
. -
Добавлена проверка динамической оценки уровня угрозы на основе данных из активного списка. Проверяется, каким образом правило реагирует на обнаружение подозрительных IP-адресов и присваивает соответствующий уровень угрозы (
high
илиmedium
).
-
Интерпретация
Основной поток (select):
-
alias: HighRateTraffic
определяет начальный поток событий для корреляции, который в данном случае представляет потокHighRateTraffic
. ПотокHighRateTraffic
охватывает события, соответствующие определенным критериям, таким как высокая частота HTTP-запросов от одного источника IP.
Первый уровень соединения (join):
-
alias: SuspiciousPatterns
указывает на то, что потокHighRateTraffic
будет соединяться с потокомSuspiciousPatterns
. ПотокSuspiciousPatterns
фильтрует события на основе определенных паттернов в пути запроса (например, оканчивающихся на "/api/highload"). -
on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip }
говорит о том, что соединение между потокамиHighRateTraffic
иSuspiciousPatterns
будет осуществляться на основе совпадения значения поля.source_ip
. То есть корреляция произойдет, если источник IP в обоих потоках событий совпадает. -
В операции соединения
HighRateTraffic
иSuspiciousPatterns
используется полеabsent: true
, что указывает на то, что отсутствие события изSuspiciousPatterns
в течение 30 секунд будет добавлено в цепочку корреляционных событий.
Второй уровень соединения (join):
-
alias: UnusualUserAgents
добавляет третий поток событийUnusualUserAgents
в цепочку корреляции. Этот поток фокусируется на событиях с "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в полеuser_agent
. -
on: - eq: { HighRateTraffic: .user_agent, UnusualUserAgents: .user_agent }
устанавливает, что соединение междуHighRateTraffic
иUnusualUserAgents
осуществляется на основе совпадения по полю.user_agent
. Это означает, что для корреляции в рамках данного правила событие должно соответствовать критериям как высокой частоты запросов, так и "подозрительного"user_agent
.
Таким образом, создается многоуровневая логика корреляции, где события должны соответствовать критериям всех трех потоков — HighRateTraffic
, SuspiciousPatterns
и UnusualUserAgents
. Это позволяет правилу "EnhancedDDoSDetectionRule" выявлять потенциальные DDoS-атаки, основываясь на более сложных и детализированных критериях, чем в исходной версии правила.