Декларативные правила

В данном разделе представлен обзор структуры, принципа работы, процесса создания и расширения конфигурации декларативного правила корреляции R-Vision SIEM. Управление корреляционными правилами в системе осуществляется в разделе Экспертиза.

О декларативных правилах корреляции

Декларативное правило корреляции в системе R-Vision SIEM представляет собой структурированное описание желаемых корреляций между событиями безопасности, представленное в виде объекта данных в формате RObject (.ro). Это описание определяет условия корреляции на высоком уровне, фокусируясь на желаемом результате корреляции, а не на точной последовательности операций для его достижения.

Структура декларативного правила корреляции подразумевает определение псевдонимов (aliases) для событий и их взаимосвязей. Псевдонимы позволяют группировать события в отдельные потоки и связывать их в логические цепочки (join). Условия корреляции и агрегации задаются отдельно для каждой группы событий, что позволяет системе объединять их в единые корреляционные события.

Процесс корреляции состоит из последовательности операций сопоставления, где каждая операция связывает события на основе определенных критериев. Корреляция завершается, когда событие проходит через всю цепочку операций и соответствует всем условиям. Если задана группировка, событие может быть агрегировано с другими по установленным критериям. Дополнительно, корреляционное событие, можно обогатить используя правила, написанные на языке описания трансформаций и корреляций VRL

Структура правила корреляции

Правила корреляции содержат набор обязательных и опциональных полей.

Строки, начинающиеся с символа $ ($foo), интерпретируются в VRL-блоках элементов экспертизы как переменные окружения. Чтобы избежать этого, экранируйте такие строки символом $ ($$foo).
Поле Описание Тип данных Обязательное поле

id

Уникальный идентификатор правила корреляции.

строка

да

name

Название правила.

строка

да

type

Тип элемента. Всегда имеет значение correlation_rule для элементов экспертизы этого типа.

строка

да

version

Версия правила корреляции, представленная в формате Semantic Versioning.

строка

да

author

Имя и контактные данные автора правила корреляции.

строка

нет

description

Описание правила корреляции, содержащее дополнительную информацию о его назначении и функциональности.

строка

нет

data_source

Список источников данных, с которыми совместимо правило корреляции. Используется для классификации и быстрого поиска элементов экспертизы в системе.

массив строк

массив объектов

нет

status

Статус разработки текущей версии правила. Может принимать любые текстовые значения.

Примеры значений:

  • debug — версия в разработке;

  • experimental — экспериментальная версия;

  • test — версия для тестирования;

  • stable — стабильная версия.

строка

нет

date

Дата создания или изменения правила. Задается в формате YYYY-MM-DD, то есть "год-месяц-день".

Поле не связано с полями Дата создания и Дата изменения в карточке элемента экспертизы.

строка

нет

reference

Список ссылок на дополнительную информацию об атаках, которые пытается обнаружить правило.

массив строк

нет

known_false_positives

Список возможных ложноположительных срабатываний правила. Позволяет определить, было ли правило отработано ложно или нет.

массив строк

нет

severity

Уровень угрозы корреляционного события, испускаемого правилом. Допустимые значения:

  • critical — критический;

  • high — высокий;

  • medium — средний;

  • low — низкий.

строка

да

tags

Список тегов, присвоенных правилу корреляции для классификации и быстрого поиска.

массив строк

нет

rate_limit

Параметры для ограничения генерации корреляционных событий. При превышении ограничений система приостанавливает работу правила корреляции.

объект

нет

max_correlation_windows

Максимально допустимое количество корреляционных окон. При превышении ограничения система останавливает работу правила корреляции.

число

нет

filter

Блок правил на языке VRL, определяющий условия (фильтр), при которых правило корреляции будет применяться к событиям.

строка

нет

aliases

Псевдонимы потоков событий, которые используются для их идентификации в рамках правила.

объект

да

select

Определяет структуру корреляции, включая псевдонимы и условия в join. Указывает начальный псевдоним потока событий, с которого начинается процесс корреляции.

объект

да

group

Параметры для группировки результатов корреляции.

массив

нет

ttl

Максимальное общее время хранения промежуточных корреляционных данных в секундах для всей цепочки событий. При отсутствии индивидуального значения wait_for для конкретного join, используется значение из поля ttl. Является обязательным, если в правиле используются операции соединения (join) и группировки (group).

число

нет

on_correlate

Содержит логику обогащения корреляционного события на языке VRL, которая выполняется после успешного соответствия корреляционным условиям.

строка

нет

trigger_alert

Определяет, должно ли создаваться оповещение по результатам корреляции событий. По умолчанию, если не указано, считается true.

логический

нет

setup_tests

Настройка тестового окружения для правил, включая предварительную конфигурацию всех используемых в правиле внешних зависимостей, таких как активные списки. Это поле позволяет определить и инициализировать все необходимые ресурсы для выполнения тестов правила, включая MOCK данные для активных списков.

объект

нет (если нет внешних зависимостей)

throttle_time_sec

Указывает интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в поле group.

число

нет

tests

Блок тестирования правила корреляции, содержащий массив тестовых событий и условия для проверки срабатывания правила.

объект

нет

Структура data_source

Поле data_source, представленное массивом строк, имеет структуру:

data_source:
- <platform> # Платформа или информационная система
- <source> # Источник событий.
# События (EventId) или путь к журналу событий:
- <EventID_1>
- <EventID_2>

Поле data_source, представленное массивом объектов, имеет следующую структуру:

Поле Описание Тип данных Обязательное поле

platform

Платформа или информационная система.

строка

нет

source

Источник событий.

строка

нет

events

События (EventId) или путь к журналу событий.

массив строк

нет

Структура rate_limit

Поле Описание Тип данных Обязательное поле

max_rate

Максимально допустимое количество корреляционных событий, генерируемых за duration_secs секунд.

При превышении правило корреляции приостанавливает работу на grace_period_secs секунд.

число

да

duration_secs

Промежуток времени в секундах, в течение которого допустима генерация не более max_rate корреляционных событий.

число

да

grace_period_secs

Промежуток времени в секундах, в течение которого ожидается восстановление нормального режима работы правила.

При нормальном режиме за duration_secs секунд правило генерирует не более max_rate корреляционных событий.

число

да

Структура aliases

Поле Описание Тип данных Обязательное поле

Имя_псевдонима

Уникальное имя, идентифицирующее поток событий.

строка (ключ в объекте aliases)

да

filter

Условие на языке VRL, которое определяет критерии фильтрации событий.

строка

да

Структура select

Поле Описание Тип данных Обязательное поле

alias

Определяет начальный псевдоним потока событий для корреляции.

строка

да

join

Описывает последовательность операций соединения других потоков событий с исходным, указанным в alias.

объект

да

on

Условия, по которым происходит соединение потоков событий.

массив

да

absent

Указывает, должно ли отсутствие события из потока alias рассматриваться как условие для перехода к следующему join. Если указано absent: true, то для этого join поле wait_for является обязательным. По умолчанию, если не указано, считается false.

булево

нет

wait_for

Время жизни для промежуточных результатов корреляции. Если не задано, используется значение из ttl верхнего уровня. Если указано absent: true, то wait_for определяет время ожидания для проверки условия absent.

число

нет (но обязательно, если absent: true)

Структура join

Поле Описание Тип данных Обязательное поле

join

Определяет последующие псевдонимы, которые присоединяются к корреляции.

объект

да

on

Условия, по которым происходит соединение между псевдонимами потоков событий.

массив

да

absent

Указывает, должно ли отсутствие события из потока alias рассматриваться как условие для перехода к следующему join.

логический

нет

wait_for

Время жизни для промежуточных результатов корреляции. Если не задано, используется значение из ttl верхнего уровня. Если указано absent: true, то wait_for определяет время ожидания для проверки условия absent.

число

нет (но обязательно, если указано absent: true)

Структура group

Поле Описание Тип данных Обязательное поле

alias

Идентифицирует псевдоним потока событий, по атрибутам которого будет производиться группировка.

строка

да

by

Поля, по которым будет производиться группировка.

массив строк

да

count

Количество элементов в группе.

число

да

unique

Указывает, должна ли группировка выполняться по уникальным значениям полей, указанных в by. По умолчанию, если не указано, считается false.

логический

нет

Структура tests

Поле Описание Тип данных Обязательное поле

name

Название теста, позволяющее идентифицировать его в тестовой выдаче.

строка

да

events

Поток тестовых событий, обрабатываемых правилом.

массив

да

assertion

Блок правил на языке VRL, который будет вызываться на каждое событие, испускаемое полем events. Используя встроенные функции assert и assert_eq, можно строить проверки на соответствие испускаемого события тестовым условиям.

строка

да

Операторы

В декларативных правилах R-Vision SIEM операторы используются для определения логики соединения и анализа данных между различными потоками событий. Они позволяют формулировать условия для сопоставления и группировки событий, основываясь на определенных атрибутах или характеристиках этих событий.

На данный момент в корреляционных правилах поддерживается только оператор eq.

Оператор eq

Оператор eq используется для определения условия равенства между полями в различных потоках событий. Этот оператор является частью механизма join, который позволяет связывать события на основе общих характеристик или атрибутов.

Синтаксис

Оператор eq используется внутри блока join и принимает следующий синтаксис:

join:
  alias: <имя_псевдонима>
  on:
    - eq: { <псевдоним1>: <поле1>, <псевдоним2>: <поле2> }

Здесь <имя_псевдонима> — это псевдоним потока событий, к которому применяется оператор join, <псевдоним1> и <псевдоним2> — это псевдонимы потоков событий, между которыми осуществляется сравнение, а <поле1> и <поле2> — это поля в этих потоках событий, которые сравниваются.

Пример:

В следующем примере демонстрируется использование оператора eq для соединения двух потоков событий на основе равенства IP-адресов источника:

aliases:
  Traffic1:
    filter: !vrl |
      .event_type == "HTTP_REQUEST"
  Traffic2:
    filter: !vrl |
      .event_type == "FTP_REQUEST"

select:
  alias: Traffic1
  join:
    alias: Traffic2
    on:
      - eq: { Traffic1: .source_ip, Traffic2: .source_ip }

В этом примере, события из Traffic1 и Traffic2 соединяются, когда поле .source_ip совпадает между этими двумя потоками.

Составление правила

Управление корреляционными правилами в системе осуществляется в разделе Экспертиза.

Для составления правила корреляции в системе R-Vision SIEM, необходимо заполнить обязательные поля правила. Это включает поля метаданных и поля, отвечающие за логику работы правила.

  1. Определите метаданные правила:

    • id: задайте уникальный идентификатор правила.

    • name: задайте название правила.

    • version: укажите версию правила, следуя стандартам Semantic Versioning.

    • type: определите тип правила как correlation_rule.

    • window_mode: оставьте поле пустым, чтобы система ожидала декларативное правило.

    • author (опциональное): укажите имя и контактные данные автора.

    • description (опциональное): предоставьте краткое описание назначения и функциональности правила.

    • data_source (опциональное): укажите список совместимых с правилом источников данных для классификации правила в системе.

    • status (опционально): опишите статус разработки текущей версии правила.

    • date (опционально): укажите дату создания или изменения правила.

    • reference (опционально): предоставьте список ссылок на дополнительный материал об атаках, которые пытается обнаружить правило.

    • known_false_positives (опционально): укажите список возможных ложноположительных срабатываний правила.

    • severity: укажите уровень угрозы корреляционного события, испускаемого правилом (critical, high, medium или low).

    • tags (опциональное): добавьте теги для классификации правила.

  2. Разработайте логику фильтрации и корреляции событий:

    • filter: создайте условие для фильтрации входящих событий. События, которые не удовлетворяют условиям фильтра, исключаются из дальнейшего анализа и не учитываются в цепочке корреляционных операций.

    • aliases: создайте псевдонимы для потоков событий с соответствующими фильтрами.

    • select и join: настройте логику соединения потоков событий, начиная с указанного псевдонима (alias) и определяя условия соединения (on).

    • ttl: определите общее максимальное время (ttl) для всей цепочки корреляционных операций, чтобы ограничить продолжительность хранения промежуточных результатов в памяти.

  3. Задайте логику группировки событий (опционально):

    • group: определите параметры для группировки результатов корреляции, включая псевдоним (alias), поля для группировки (by) и количество элементов в группе (count).

    • throttle_time_sec: определите интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в поле group.

  4. Опишите пост-корреляционные действия (опционально):

    • on_correlate: определите логику обогащения корреляционного события, используя язык VRL. VRL-программа будет применяться к корреляционному событию после успешной корреляции.

  5. Разработайте тесты для правила (опционально):

    • tests: создайте тесты, включающие различные типы событий и условия, для проверки эффективности и точности правила корреляции.

Поля метаданных

Метаданные являются основой для идентификации и описания правила корреляции в системе. Они должны быть заполнены следующим образом:

  • id: установите уникальный идентификатор правила. Этот идентификатор должен быть уникальным в рамках системы.

  • name: укажите название правила.

  • version: задайте версию правила, следуя принципам Semantic Versioning (например, 1.0.0).

  • type: установите тип правила correlation_rule.

  • author: укажите имя и контактные данные создателя правила.

  • description: предоставьте краткое описание назначения и функциональности правила.

  • data_source: укажите список совместимых с правилом источников данных для классификации правила в системе.

  • status: опишите статус разработки текущей версии правила.

  • date: укажите дату создания или изменения правила.

  • reference: предоставьте список ссылок на дополнительный материал об атаках, которые пытается обнаружить правило.

  • known_false_positives: укажите список возможных ложноположительных срабатываний правила.

  • severity: укажите уровень угрозы корреляционного события, испускаемого правилом (critical, high, medium или low).

  • tags: укажите теги для классификации правила в системе.

Поле rate_limit

Поле rate_limit позволяет задать ограничения на генерацию корреляционных событий.

  1. Укажите в поле max_rate максимально допустимое количество корреляционных событий, которое должно генерировать правило.

  2. Укажите в поле duration_secs период времени в секундах, в течение которого допускается генерация не более max_rate корреляционных событий.

  3. Укажите в поле grace_period_secs период времени в секундах, в течение которого система будет ожидать восстановления нормальной работы правила.

Если за duration_secs секунд будет сгенерировано более max_rate событий, то система приостановит работу правила корреляции на grace_period_secs секунд.

Ограничение работает только при отключенном режиме распределенной корреляции.

Пример:

rate_limit:
    max_rate: 100
    duration_secs: 60
    grace_period_secs: 300

В этом примере устанавливается ограничение на генерацию не более 100 корреляционных событий за 60 секунд (1 минуту). При превышении заданного ограничения правило корреляции приостанавливает работу на 300 секунд (5 минут).

Поле max_correlation_windows

Поле max_correlation_windows определяет максимально допустимое количество корреляционных окон. При превышении система останавливает работу правила корреляции. Для возобновления работы правила необходимо заново установить конфигурацию конвейера.

Ограничение работает только при отключенном режиме распределенной корреляции.

Пример:

max_correlation_windows: 100

В этом примере устанавливается ограничение на открытие не более 100 корреляционных окон.

Поле filter

Поле filter определяет подмножества событий, которые будут обработаны правилом. Это поле использует язык VRL для описания логики фильтрации, позволяя точно настроить, какие события будут участвовать в корреляции.

Предназначение поля filter — оценить каждое входящее событие и определить, соответствует ли оно заданным критериям. Если событие не соответствует условиям фильтра, оно исключается из дальнейшего анализа и не учитывается в корреляционном анализе. Это позволяет снизить объем обрабатываемых данных и увеличить точность и эффективность корреляционных правил.

В VRL для написания условий фильтра используются логические операторы (&&, ||, !), сравнения (==, !=, <, >, , >=) и другие функции языка для работы с данными (например, includes, starts_with). Это позволяет создавать сложные и гибкие условия фильтрации.

Пример:

filter: !vrl |
    .source_ip != null &&          # Событие должно иметь ненулевой IP-адрес источника
    .request_path != null &&       # Путь запроса также должен быть указан
    .response_code == 404          # Код ответа должен быть 404

В этом примере фильтр выбирает события, у которых есть IP-адрес источника и путь запроса, а также код ответа равен 404. Это может быть полезно для выявления потенциальных попыток несанкционированного доступа или сканирования уязвимостей на веб-сервере.

Поле aliases

Поле aliases в правиле корреляции используется для разделения и фильтрации потока событий.

  1. Определите псевдонимы. Назначьте каждому потоку событий уникальный псевдоним. Это имя будет использоваться для ссылки на этот поток в других частях правила.

    Пример:

    Для потока неудачных попыток входа можно использовать псевдоним FailedLogins.

  2. Сформулируйте фильтры для каждого псевдонима. Укажите, какие события будут относиться к данному потоку, основываясь на их атрибутах, таких как тип события, источник или содержание.

Пример фильтра: .event_type == "login_failure" && .source_ip != null — этот фильтр отбирает все события неудачных попыток входа, где указан IP-адрес источника.

Пример:

aliases:
  HighRateTraffic:
    filter: !vrl |
      .event_type == "login_failure" && .source_ip != null
  SuspiciousPatterns:
    filter: !vrl |
      .event_type == "HTTP_REQUEST"

Поле select

Поле select в правилах корреляции определяет логику соединения потоков событий. Это поле обеспечивает структурирование и организацию процесса корреляции, позволяя связывать события из разных источников на основе заданных критериев. Левый псевдоним в join является основным источником событий, к которому добавляются события из правого псевдонима, удовлетворяющие условиям соединения. Если в правом псевдониме нет событий, соответствующих условиям соединения, то события из левого псевдонима все равно будут обработаны, но без добавления данных из правого псевдонима.

  1. Выберите исходный псевдоним. Укажите в поле alias псевдоним, который будет использоваться как отправная точка для процесса корреляции. Этот псевдоним должен быть одним из определенных в разделе aliases.

  2. Разработайте логику соединения (join). Используйте раздел join, чтобы определить, какие дополнительные потоки событий будут привлекаться для корреляции. В каждом join указывается псевдоним другого потока событий, который будет соединяться с исходным.

  3. Определите условия соединения (on). В каждом блоке join укажите условия соединения в разделе on, которые определяют критерии соотношения событий из разных потоков.

  4. Используйте absent (опционально) для определения логики корреляции, основанной на отсутствии события.

  5. Укажите wait_for (опционально, но обязательно при absent: true) для установки времени ожидания для каждой операции соединения. Система будет накапливать события в течение указанного времени для каждого join и затем анализировать их на предмет соответствия заданным критериям соединения. Если absent: true, наряду с поступлением событий система будет регистрировать и их отсутствие.

Пример:

select:
  alias: TrafficA
  join:
    alias: TrafficB
      on:
        - eq: { TrafficA: .source_ip, TrafficB: .source_ip }
      wait_for: 80
    alias: TrafficC
      absent: true
      on:
        - eq: { TrafficA: .source_ip, TrafficC: .source_ip }
      wait_for: 60

В этом примере, начиная с потока TrafficA, сначала происходит соединение с TrafficB по совпадению источников IP. Затем проверяется отсутствие событий из TrafficC, соответствующих тому же условию, в течение 60 секунд.

Можно создать цепочку join, соединяя более двух потоков событий. Это позволяет формировать сложные условия корреляции для выявления более сложных угроз или аномалий.

Поле join

Поле join в правиле корреляции отвечает за создание корреляционных отношений между потоками событий. Оно используется для объединения двух или более потоков событий на основе заданных критериев, включая временной интервал хранения промежуточных результатов корреляции (ttl).

  • Структура join:

    • Каждый join включает поля alias, on, absent (опционально), и wait_for (опционально).

    • alias в join указывает псевдоним потока событий, который будет соединяться с начальным потоком, определенным в select.

    • on описывает условие соединения, обычно основываясь на совпадении значений определенных полей между событиями.

    • absent (опционально) определяет, должно ли отсутствие события из потока alias рассматриваться как условие для перехода к следующему join. Если указано absent: true, то для этого join поле wait_for является обязательным.

    • wait_for (опционально, если absent: false) устанавливает максимальное время хранения промежуточных результатов корреляции для конкретной операции соединения. Если для конкретного join не указано значение wait_for, применяется общее ограничение времени, заданное на уровне правила в поле ttl. wait_for является обязательным полем, если указано absent: true.

  • Условия соединения on:

    • Условие on с операцией eq (равенство) используется для указания полей, по которым будет происходить сопоставление между потоками событий.

  • Создание цепочек join:

    • В join можно формировать цепочки, где каждый последующий join добавляет новый поток событий к уже существующей группе.

    • Это позволяет строить многоуровневые корреляции, где каждый новый уровень добавляет уточнения или расширяет условия корреляции.

Пример:

Для соединения двух потоков событий TrafficA и TrafficB, когда source_ip в TrafficA равен destination_ip в TrafficB, и одновременно проверяется отсутствие события TrafficC в заданный временной интервал, используется следующая конструкция:

select:
  alias: TrafficA
  join:
    alias: TrafficB
      on:
        - eq: { TrafficA: .source_ip, TrafficB: .destination_ip }
      wait_for: 300  // Время жизни промежуточных результатов в секундах
    alias: TrafficC
      absent: true
      on:
        - eq: { TrafficA: .source_ip, TrafficC: .destination_ip }
      wait_for: 60  // Время ожидания отсутствия события в секундах

Поле group

Поле group в правиле корреляции отвечает за группировку событий, что позволяет проводить более сложный и точный корреляционный анализ.

  1. Определите псевдоним для группировки. В alias указывается псевдоним потока событий, для которого может быть произведена группировка. Этот псевдоним должен соответствовать одному из определенных в разделе aliases.

  2. Выберите поля для группировки. Поле by определяет, по каким критериям (полям) будут группироваться события. Можно указать одно или несколько полей, на основе которых будут сформированы группы событий.

  3. Установите лимит количества событий в группе. Поле count определяет минимальное количество событий в каждой группе. Этот параметр позволяет контролировать, когда группа достигает достаточного размера для корреляции или инициирования действия.

  4. Определите признак уникальности группировки событий (опционально). В unique указывается, должна ли группировка выполняться по уникальным значениям полей, выбранных в by:

    • Если unique: true, то в группу будут собираться события с различными значениями полей.

    • Если unique: false, то в группу могут собираться события с одинаковыми значениями полей.

Пример:

group:
  - alias: TrafficA
    by:
      - source_ip
    count: 10
  - alias: TrafficB
    by:
      - suser
    count: 2
    unique: true

В данном примере производится группировка событий из потоков TrafficA и TrafficB:

  • Для событий из потока TrafficA группировка производится по одинаковым значениям поля source_ip.

  • Для событий из потока TrafficB группировка производится по уникальным значениям поля suser.

Корреляционное событие будет создано, когда в группе соберется 10 событий с одинаковым source_ip из потока TrafficA и 2 события с различными suser из потока TrafficB.

Можно задать несколько критериев в by, чтобы группировать события по более чем одному полю, что позволяет создавать более сложные условия корреляции.

Поле throttle_time_sec

Поле throttle_time_sec указывает интервал в секундах, который определяет, как часто могут генерироваться корреляционные события для каждой группы в поле group.

Пример:

throttle_time_sec: 60

В этом примере интервал между корреляционными событиями составляет 60 секунд для каждой группы.

Поле ttl

Поле ttl (time-to-live) определяет общее максимальное время в секундах, в течение которого промежуточные результаты всех корреляционных операций в правиле сохраняются в памяти, прежде чем будут удалены. Это значение применяется ко всем операциям корреляции в правиле. В отдельных join блоках может применяться поле wait_for для определения времени хранения промежуточных результатов корреляции конкретного join блока. При его отсутствии применяется общее значение ttl на уровне правила.

Если в правиле не используются операции соединения (join) и группировки (group), поле ttl является необязательным.

Пример:

select:
  alias: TrafficA
  join:
    alias: TrafficB
    on:
      - eq: { TrafficA: .source_ip, TrafficB: .destination_ip }
    ttl: 3600  // Промежуточные результаты для всей цепочки событий этого `join` будут храниться в течение 1 часа

Поле on_correlate

Поле on_correlate в декларативных правилах корреляции используется для определения действий обогащения, которые выполняются после успешной корреляции событий. Эти действия могут включать преобразование данных, добавление новой информации к событиям или выполнение вычислений на основе данных событий. Для определения этих действий используется язык VRL. Помимо этого, это может включать извлечение информации из активных списков и использоваться для взаимодействия с таблицами обогащения.

Пример заполнения поля on_correlate:

on_correlate: !vrl |
  .message = "Обнаружена аномалия доступа с источника: " + .source_ip
  .threat_level = match .event_count {
    >= 1000 => "Критический",
    >= 500 => "Высокий",
    _ => "Средний"
  }
  .analysis_summary = "Источник " + .source_ip + " сгенерировал " + tostring(.event_count) + " событий, что указывает на потенциальную угрозу."

В этом примере:

  • Присваивается новое сообщение событию с детальным описанием обнаруженной аномалии.

  • Устанавливается уровень угрозы на основе количества событий, связанных с источником.

  • Формируется сводка анализа, включающая IP-адрес источника и общее количество событий.

Поле trigger_alert

Поле trigger_alert в корреляционном правиле отвечает за решение о создании оповещений, основанных на результатах корреляции событий. Это поле определяет, должны ли по результатам корреляции создаваться оповещения в системе.

Поле trigger_alert может принимать значения true или false. Когда trigger_alert установлено в true, оповещение будет создаваться при каждом успешном случае корреляции, соответствующем условиям правила. Если trigger_alert установлено в false, оповещения создаваться не будут, даже если корреляция происходит. В случае, если поле trigger_alert не указано в правиле, его значение по умолчанию считается true, что подразумевает создание оповещения по умолчанию. Изменение значения trigger_alert в существующем правиле влияет на обработку будущих событий, но не затрагивает уже созданные оповещения.

Пример:

# Логика обогащения корреляционного события на языке VRL.
on_correlate: !vrl |
 .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip)
 .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"

# Управляет созданием оповещений по результатам корреляции.
trigger_alert: true

# Тесты для проверки работы правила.
tests:
...

Тестирование правила

Тесты в правиле корреляции позволяют проверить корректность работы правила и его способность реагировать на заданные типы событий.

Тестирование правила корреляции гарантирует, что правило корректно обрабатывает данные, точно идентифицирует события и правильно реагирует на них, обеспечивая надежную работу системы.

В случае, если правило корреляции использует данные из активных списков и/или таблиц обогащения, необходимо инициализировать MOCK данные через поле setup_tests. Это позволит тестировать правило в условиях, максимально приближенных к реальной эксплуатации, без необходимости доступа к продуктивным данным.

Поле setup_tests

Для тестирования правил, которые используют внешние зависимости (например, взаимодействуют с активными списками и/или таблицами обогащения), использование поля setup_tests является обязательным.

Поле setup_tests в конфигурации правила корреляции предназначено для определения тестового окружения, включая предварительную настройку MOCK данных для активных списков и таблиц обогащения, используемых в правиле. Это обеспечивает проверку правила на корректность выполнения и взаимодействие с внешними данными до его применения в операционной среде.

setup_tests используется для:

  • Имитации взаимодействия правила с активными списками и таблицами обогащения.

  • Проверки логики правила с учетом ожидаемых данных обогащения.

Процедура настройки setup_tests:

  1. В корне конфигурации правила указывается секция setup_tests.

  2. Внутри setup_tests через ключ enrichment_tables или active_lists задаются MOCK таблицы обогащения и активные списки, используемые правилом.

  3. Для каждого MOCK элемента указывается его структура и набор MOCK данных, которые будут использоваться при тестировании.

  4. MOCK данные должны соответствовать ожидаемым данным из реальных таблиц обогащения/активных списков и позволять полноценно проверить работу правила.

Example 1. Структура setup_tests для активного списка:
setup_tests:
  active_lists:
    <имя_списка>:
      <поле>:
        - [<значение_поля1>]
        - [<значение_поля2>]
Example 2. Структура setup_tests для таблицы обогащения:
setup_tests:
  enrichment_tables:
    <имя_таблицы>:
      fields:
        - <поле1>
        - <поле2>
      data:
        - [<значение1_поля1>, <значение1_поля2>]
        - [<значение2_поля1>, <значение2_поля2>]

Поле tests

Поле tests включает набор тестов для проверки и верификации правила корреляции. Эти тесты помогают убедиться, что правило работает корректно и соответствует ожидаемым результатам.

Чтобы создать тестовый сценарий, в поле tests определите следующие параметры:

  • В name укажите название теста.

  • В events создайте массив событий, которые будут использоваться для тестирования. Эти события должны имитировать поток реальных исходных событий, которые поступают на коррелятор.

  • В assertion установите условие или набор условий, которые должны быть выполнены в результате обработки тестовых событий правилом. Это может включать проверку конкретных полей в итоговых агрегированных событиях или подсчет количества таких событий.

Тестовые события

Тестовые события должны имитировать данные, которые в реальности поступают на коррелятор. Они должны соответствовать формату и структуре событий, обрабатываемых правилом корреляции и содержать поля, которые используются в правиле.

Если в правиле настроена корреляция по полю с типом DateTime, например timestamp, то значение этого поля в tests необходимо задавать в формате t'<DateTime-значение>'. Например: t'15.05.2024 06:28:15'. Это важно при корреляции на отсутствие событий.

В противном случае при запуске тестов правила система обработает поле в виде строки и корреляция по дате-времени будет работать некорректно.

Условия проверки

Условия должны точно отражать ожидаемый результат применения правила к тестовым событиям. Это может быть проверка наличия определенных полей в событии, соответствие их значений ожидаемым или сравнение количества событий с предполагаемым исходом. Для более сложных правил могут потребоваться более сложные утверждения, которые могут включать логические операции и сравнения.

Пример:

tests:
  - name: "Тестирование обнаружения DDoS атаки"
    events:
      - { source_ip: "192.168.1.1", event_type: "traffic_anomaly", traffic_volume: "10000" }
    assertion: !vrl |
        assert_eq!(.alert_type, "DDoS", "Ожидается обнаружение DDoS атаки")

Обогащение через внешние зависимости

Обогащение корреляционных событий представляет собой процесс дополнения событий, сгенерированных в ходе корреляции, дополнительными данными. В декларативных правилах корреляции можно настроить логику обогащения в поле on_correlate.

В рамках языка VRL обогащение корреляционных событий включает в себя применение специфических функций и выражений для динамического добавления или модификации данных в событиях. Помимо этого, это может включать извлечение информации из активных списков, которые могут предоставлять как статическую, так и динамическую информацию в зависимости от настроек схемы активного списка. А также взаимодействие с таблицами обогащения, которые предоставляют статическую информацию, например, сведения о геолокации, данные организаций, соответствующие спискам разрешенных и заблокированных ресурсов.

Активные списки

Активные списки представляют собой структуры данных, которые используются для хранения и управления как динамически изменяющимися наборами информации так и статическими списками и таблицами, такими как IP-адреса, доменные имена, идентификаторы устройств или пользователей. В контексте декларативных правил корреляции, поле on_correlate может включать вызовы функций для взаимодействия с активными списками, используя язык VRL. Это позволяет добавлять, извлекать и удалять записи из активных списков на основе логики обработки корреляционных событий.

Функции для взаимодействия с активными списками:

Функция Описание Параметры и Результат

add_active_record(list, key, value)

Добавляет запись в активный список. Если запись с таким ключом уже существует, значение обновляется. В противном случае создается новая запись.

list: название активного списка, в который добавляется запись. key: уникальный ключ для записи. value: значение, ассоциированное с ключом.

get_active_record(list, key)

Извлекает значение из активного списка по указанному ключу. Возвращает значение, ассоциированное с ключом, или null, если запись отсутствует.

list: активный список, из которого нужно извлечь запись. key: уникальный ключ записи.

remove_active_record(list, key)

Удаляет запись из активного списка. Если запись с таким ключом отсутствует, ничего не происходит.

list: активный список, из которого необходимо удалить запись. key: уникальный ключ записи, которую необходимо удалить.

Example 3. Пример использования в поле on_correlation правила корреляции:
on_correlation: !vrl |
  if .threat_level == "High" {
    add_active_record("suspicious_ips", .source_ip, true)
  }

  if .event_type == "authentication_failure" {
    let failure_count = get_active_record("auth_failure_counts", .user_id) || 0
    add_active_record("auth_failure_counts", .user_id, failure_count + 1)
  }

  if .event_type == "authentication_success" {
    remove_active_record("auth_failure_counts", .user_id)
  }

В этом примере:

  • IP-адреса с высоким уровнем угрозы добавляются в активный список подозрительных IP-адресов.

  • Считается количество неудачных попыток аутентификации для пользователей и обновляется соответствующая запись в активном списке.

  • При успешной аутентификации запись пользователя удаляется из списка неудачных попыток аутентификации, что может свидетельствовать об окончании атаки или восстановлении доступа пользователя.

Таблицы обогащения

Таблицы обогащения представляют собой структурированные наборы данных, используемые для добавления контекстной информации к событиям в процессе их корреляции. Это может включать данные о геолокации, информацию об устройствах, базы данных угроз и другие внешние источники данных. В поле on_correlate декларативных правил корреляции можно использовать специализированные функции VRL для запроса и извлечения данных из таблиц обогащения, тем самым обогащая события дополнительными деталями для более глубокого анализа.

Функции для взаимодействия с таблицами обогащения:

  • find_enrichment_table_records(table, condition, [select, case_sensitive]): позволяет выполнить поиск записей в таблице обогащения, которые соответствуют заданным условиям. Возвращает массив записей, удовлетворяющих условиям поиска.

  • get_enrichment_table_record(table, condition, [select, case_sensitive]): используется для извлечения одной записи из таблицы обогащения, которая точно соответствует заданному условию. В случае, если запись не найдена или найдено более одной записи, функция возвращает ошибку.

Пример использования в поле on_correlate:

on_correlate: !vrl |
  let geo_info = find_enrichment_table_records("geoip", {"ip": .source_ip}, select: ["country_name", "city_name"])
  if !geo_info.is_empty() {
    .source_country = geo_info[0].country_name
    .source_city = geo_info[0].city_name
  }

  let threat_info = get_enrichment_table_record("threat_intelligence", {"ip": .source_ip}, select: ["category", "score"])
  if !is_null(threat_info) {
    .threat_category = threat_info.category
    .threat_score = threat_info.score
  }

В этом примере:

  • Выполняется поиск геолокационной информации по IP-адресу источника события в таблице geoip, и результаты добавляются к событию как .source_country и .source_city.

  • Затем производится запрос к таблице threat_intelligence для извлечения информации о категории угрозы и оценке уровня угрозы, ассоциированных с IP-адресом источника. Эти данные добавляются к событию как .threat_category и .threat_score.

Тестирование правила с внешними зависимостями

Тестирование правил корреляции, которые обращаются к таблицам обогащения и/или активным спискам, требует предварительной настройки тестового окружения с помощью поля setup_tests, которое позволяет определить MOCK данные для внешнего элемента.

Поле setup_tests позволяет инициализировать необходимые данные для таблиц обогащения и активных списков перед запуском тестов правила корреляции. Это обеспечивает возможность проверить не только синтаксическую корректность правила, но и его логику обработки данных при взаимодействии с внешними элементами.

Процесс тестирования

  1. Определение MOCK данных: для начала тестирования необходимо определить MOCK данные для всех активных списков и таблиц обогащения, с которым взаимодействует тестируемое правило. Эти данные должны быть репрезентативными и охватывать различные сценарии использования списка в правиле.

  2. Конфигурация setup_tests: в корне конфигурации правила корреляции необходимо добавить секцию setup_tests, где указываются все используемые таблицы обогащения и активные списки с их MOCK данными. Имя каждого внешнего элемента в секции должно точно совпадать с именем, используемым в теле правила.

  3. Заполнение MOCK данных: внутри внешнего элемента в секции setup_tests указывается список полей и соответствующих им MOCK данных. Данные задаются в формате, соответствующем ожидаемым запросам к активному списку и/или таблице обогащения в рамках теста.

Примеры:

Example 4. Структура MOCK данных для активного списка:
setup_tests:
  active_lists:
    suspicious_ips:
      - source_ip: "192.168.1.1"
        is_suspicious: true
      - source_ip: "10.10.10.10"
        is_suspicious: false
    auth_failure_counts:
      - user_id: "user1"
        failure_count: 1
      - user_id: "user2"
        failure_count: 2
Example 5. Структура MOCK данных для таблицы обогащения:
  enrichment_tables:
    geoip:
      fields:
        - ip
        - country
        - city
      data:
        - ["8.8.8.8", "US", "Mountain View"]
        - ["9.9.9.9", "GB", "London"]

Создание и выполнение тестов

После настройки MOCK данных следующим шагом является создание набора тестов, которые будут проверять взаимодействие правила корреляции с активными списками. Каждый тест должен включать в себя:

  1. Описание теста (name): краткое описание цели теста.

  2. События (events): список событий, которые будут подаваться на вход правила в процессе тестирования.

  3. Утверждения (assertions): логика проверки результата работы правила на основе предоставленных событий и MOCK данных активных списков.

Пример теста:

tests:
  - name: "Test for Suspicious IP Detection"
    events:
      - { source_ip: "192.168.1.1", event_type: "login_attempt" }
    assertion: !vrl |
        let is_suspicious = get_active_record("suspicious_ips", .source_ip);
        assert_eq!(is_suspicious, true, "IP should be marked as suspicious")
  - name: "Authentication Failure Count Test"
    events:
      - { user_id: "user2", event_type: "login_failure" }
    assertion: !vrl |
        let failure_count = get_active_record("auth_failure_counts", .user_id);
        assert_eq!(failure_count, 2, "User2 should have 2 authentication failures")

В разделе событий (events) указываются события, которые подаются на вход правила корреляции для проверки его срабатывания. В первом тесте это попытка входа с IP-адреса, который должен быть помечен как подозрительный. Во втором тесте — неудачная попытка входа от пользователя, для которого уже зафиксированы подобные инциденты.

Утверждения (assertions) содержат логику проверки, выполняемую после обработки событий правилом. В первом тесте проверяется, что IP-адрес из события корректно идентифицирован как подозрительный на основе данных активного списка suspicious_ips. Во втором — что количество неудачных попыток входа для пользователя соответствует ожидаемому значению, основанному на данных активного списка auth_failure_counts.

Если утверждение (assertion) выполнено успешно, это означает, что правило корреляции корректно обрабатывает входные данные и правильно взаимодействует с активными списками. В противном случае, если результат не соответствует ожидаемому, это указывает на проблемы в логике правила или в его взаимодействии с активными списками.

  • Убедитесь, что MOCK данные адекватно отражают разнообразие реальных сценариев, с которыми правило будет сталкиваться в продуктивной среде.

  • Тщательно проверяйте соответствие имен активных списков и таблиц обогащения в setup_tests именам, используемым в правиле корреляции.

  • Используйте результаты тестирования для уточнения и оптимизации логики правила корреляции и взаимодействия с внешним элементом.

Пример:

Пример правила корреляции с активным списком, схема RObject
# Уникальный идентификатор правила.
id: example/ddos_detection_rule

# Название правила.
name: DDoSDetectionRule

# Тип элемента экспертизы.
type: correlation_rule

# Версия правила.
version: 1.0.0

# Уровень угрозы, присваиваемый корреляционному правилу.
severity: high

# Описание назначения правила.
description: Обнаружение атаки DDoS на основе трафика с учетом репутации IP-адресов из активного списка DDoSAttackIPs.

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_13
    - EventID_1

# Теги, связанные с правилом.
tags:
  - ddos
  - security

# Фильтр применяется к событиям перед входом в правило.
filter: !vrl |
  .source_ip != null &&
  .request_path != null

# Разделение потоков событий на псевдонимы.
aliases:
  HighRateTraffic:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100
  SuspiciousPatterns:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload")
  NormalTraffic:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home")

# Соединение потоков событий.
select:
  alias: HighRateTraffic
  join:
    alias: SuspiciousPatterns
    absent: true
    on:
      - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip }
    wait_for: 30

# Группировка событий.
group:
  - alias: HighRateTraffic
    by:
      - source_ip
    count: 3

# Частота генерации корреляционных событий в рамках каждой группы (событий в секунду).
throttle_time_sec: 60

# Время хранения промежуточных корреляционных данных (секунды).
ttl: 60

# Логика обогащения корреляционного события на языке VRL.
on_correlate: !vrl |
  let attack_info = get_active_record("DDoSAttackIPs", .source_ip);
  if attack_info != null {
    .attack_type = attack_info.attack_type
    .last_detected = attack_info.last_detected
    .attack_volume = attack_info.attack_volume
    .reputation_score = attack_info.reputation_score
    .involved_attacks = attack_info.involved_attacks
    .notes = attack_info.notes
  }
  .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip) +
             ", тип атаки: " + to_string!(.attack_type) +
             ", объем атаки: " + to_string!(.attack_volume) +
             ", репутация IP: " + to_string!(.reputation_score) +
             ", количество атак: " + to_string!(.involved_attacks) +
             ", заметки: " + to_string!(.notes)
  if .reputation_score != null && .reputation_score < 50 {
    .ddos_alert = "Высокий уровень запросов и низкая репутация IP, потенциальная DDoS атака"

# Управляет созданием оповещений по результатам корреляции.
trigger_alert: true

# MOCK данные для активного списка в тестовом окружении.
setup_tests:
  active_lists:
    DDoSAttackIPs:
      - ip: "192.0.2.1"
        last_detected: "2023-01-01T12:00:00Z"
        attack_type: "SYN Flood"
        reputation_score: 10
        attack_volume: "1.5 Gbps"
        involved_attacks: 5
        notes: "Repeated attacks on multiple targets"
      - ip: "192.0.2.2"
        last_detected: "2023-01-02T15:30:00Z"
        attack_type: "UDP Flood"
        reputation_score: 20
        attack_volume: "500 Mbps"
        involved_attacks: 2
        notes: "High packet rate observed"
      - ip: "192.0.2.3"
        last_detected: "2023-01-03T18:45:00Z"
        attack_type: "Amplification"
        reputation_score: 15
        attack_volume: "2 Gbps"
        involved_attacks: 3
        notes: "DNS amplification attack"
      - ip: "192.0.2.4"
        last_detected: "2023-01-04T20:00:00Z"
        attack_type: "HTTP Flood"
        reputation_score: 5
        attack_volume: "750 Mbps"
        involved_attacks: 4
        notes: "Targeted at web services"

# Тесты для проверки работы правила.
tests:
  - name: HighRateTraffic from Malware IP
    events:
      - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" }
    assertion: !vrl |
      assert_eq!(.ddos_alert, "Высокий уровень запросов и низкая репутация IP, потенциальная DDoS атака", "Alert generation failed for malware IP")
  - name: HighRateTraffic from Clean IP
    events:
      - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "100s", "request_path": "/api/highload" }
    assertion: !vrl |
      assert!(.ddos_alert == null || .ddos_alert == "", "Alert should not be generated for clean IP")

Пример правила корреляции

Example 6. Правило корреляции, схема RObject
# Уникальный идентификатор правила.
id: example/ddos_detection_rule

# Название правила.
name: DDoSDetectionRule

# Тип элемента экспертизы.
type: correlation_rule

# Версия правила.
version: 1.0.0

# Уровень угрозы, присваиваемый корреляционному правилу.
severity: high

# Описание назначения правила.
description: Обнаружение атаки DDoS на основе трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_13
    - EventID_1

# Теги, связанные с правилом.
tags:
  - ddos
  - security

# Фильтр применяется к событиям перед входом в правило.
filter: !vrl |
  .source_ip != null &&
  .request_path != null

# Разделение потоков событий на псевдонимы.
aliases:
 HighRateTraffic:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100
 SuspiciousPatterns:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload")
 NormalTraffic:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home")

# Соединение потоков событий.
select:
  alias: HighRateTraffic
  join:
    alias: SuspiciousPatterns
    absent: true
    on:
      - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip }
    wait_for: 30

# Группировка событий.
group:
  - alias: HighRateTraffic
    by:
      - source_ip
    count: 3

# Частота генерации корреляционных событий в рамках каждой группы (событий в секунду).
throttle_time_sec: 60

# Время хранения промежуточных корреляционных данных (секунды).
ttl: 60

# Логика обогащения корреляционного события на языке VRL.
on_correlate: !vrl |
 .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip)
 .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"

# Управляет созданием оповещений по результатам корреляции.
trigger_alert: true

# Тесты для проверки работы правила.
tests:
 - name: DDoS Attack Detected
   events:
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.100", "window": "130s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s",  "request_path": "/home" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s",  "request_path": "/home" }
=    - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.224",  "window": "60s",  "request_path": "/contact" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about" }
   assertion: !vrl |
     assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1")

Интерпретация

Правило корреляции "DDoSDetectionRule" предназначено для выявления DDoS-атак на основе анализа потока поступающих событий.

Метаданные правила:

  • id: уникальный идентификатор правила в системе.

  • name: название правила.

  • type: тип correlation_rule указывает на то, что это правило корреляции.

  • version: версия правила.

  • description: описывает цель правила — "Обнаружение атаки DDoS на основе трафика".

  • author: создатель правила.

  • tags: теги ddos и security классифицируют правило по его функциональности и области применения.

Логика фильтрации (filter):

  • Условия фильтрации .source_ip != null и .request_path != null выбирают события, у которых есть IP-адрес источника и путь запроса.

Логика фильтрации (aliases):

  • Псевдоним HighRateTraffic: фильтр для событий с высокой частотой запросов, идентифицируемых по event_type == "HTTP_REQUEST" и parse_duration!(.window) >= 100. Отбирает события с частотой запросов, превышающей или равной 100 запросов в определенный временной интервал.

  • Псевдоним SuspiciousPatterns: фильтр для событий с подозрительными путями запроса, определяемый как event_type == "HTTP_REQUEST" и ends_with(to_string!(.request_path), "/api/highload"). Отбирает запросы, пути которых оканчиваются на "/api/highload".

Соединение потоков событий (select и join):

  • Выбор и соединение (select и join) потоков событий HighRateTraffic и SuspiciousPatterns осуществляется на основе совпадения source_ip. Это соединение позволяет коррелировать высокочастотные запросы с подозрительными паттернами, исходящими от одного источника.

  • В поле wait_for определяется, что система должна ожидать в течение 30 секунд перед тем, как принять решение о соединении событий из различных псевдонимов. Это значит, что система будет накапливать события в течение указанного времени с момента получения первого события и затем анализировать их на предмет соответствия заданным критериям соединения.

  • В операции соединения с SuspiciousPatterns используется поле absent: true, что указывает на то, что отсутствие события из SuspiciousPatterns в течение 30 секунд будет добавлено в цепочку корреляционных событий.

Группировка событий (group):

  • group: группировка событий применяется к псевдониму HighRateTraffic по полю source_ip, что позволяет агрегировать и анализировать события, исходящие от одного источника IP. Количество событий для формирования группы в поле count установлено в 10, что означает, что группа будет сформирована при накоплении 3 событий от одного источника IP.

  • throttle_time_sec: задает интервал времени в секундах между генерацией корреляционных событий в рамках каждой группы событий. Например, 60 секунд.

Срок хранения промежуточных данных корреляции (ttl):

  • ttl на уровне правила задает общее максимальное время жизни промежуточных корреляционных данных. Например, 60 секунд. Это время определяет, как долго промежуточные результаты для всей цепочки событий будут храниться в памяти до их автоматического удаления, если корреляция не завершается.

Пост-корреляционная логика (on_correlate):

  • on_correlate: обогащает корреляционное событие полями .message и .additional_info.

  • trigger_alert: управляет созданием оповещений по результатам корреляции. В данном случае, с trigger_alert: true, система будет генерировать оповещения при успешной корреляции событий.

По накоплении 3 событий правило генерирует корреляционное событие со следующими атрибутами:

  • type: "correlation_rule",

  • event_type: "DDoS Detection",

  • source_ip: "192.168.1.1",

  • datetime: "2023-09-28T12:45:00Z",

  • message: "Обнаружена потенциальная DDoS атака с источника: 192.168.1.1",

  • additional_info: "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"

Тестирование правила (tests):

  • Набор тестовых сценариев "DDoS Attack Detected" включает набор событий, часть из которых имитируют характеристики DDoS-атаки, а остальные являются контрольными примерами обычного трафика. Цель тестирования — подтвердить, что правило корректно идентифицирует только события, связанные с DDoS-атаками и игнорирует неподходящие события. Тесты включают события с HTTP-запросами, различающимися по source_ip и request_path, что позволяет проверить способность правила различать атакующий трафик от легитимного.

  • События, моделирующие DDoS-атаку, имеют одинаковый source_ip и конечный путь запроса "/api/highload", что соответствует фильтрам HighRateTraffic и SuspiciousPatterns. Это сценарий, в котором ожидается активация правила и генерация корреляционного события.

  • Контрольные события с разными source_ip и неподозрительными путями запроса ("/home", "/contact", "/about") предназначены для подтверждения, что правило не реагирует на обычные запросы, не связанные с DDoS-активностью.

assertion в тестовом блоке предназначен для проверки того, что только события, соответствующие характеристикам DDoS-атаки, приводят к формированию корреляционного события с ожидаемым сообщением об обнаружении атаки.

Обработка событий правилом корреляции

Проанализируем логику работы правила "DDoSDetectionRule", фокусируясь на процессе обработки событий.

Предположим, что за короткий промежуток времени на коррелятор, на котором установлено правило "DDoSDetectionRule", поступает поток JSON-событий от различных источников IP, включая множество запросов с IP-адреса "192.168.1.1". Эти события соответствуют критериям, заданным в фильтрах HighRateTraffic и SuspiciousPatterns.

Example 7. Исходное событие, JSON
{
  "event_type": "HTTP_REQUEST",
  "source_ip": "192.168.1.x",  // где x - уникальный номер для каждого события
  "datetime": "2023-09-28T12:34:56Z",
  "request_path": "/api/highload",
  "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/93.0.4472.124 Safari/537.36"
}

По мере поступления этих событий, они фильтруются правилом "DDoSDetectionRule". Фильтр HighRateTraffic отбирает события, соответствующие запросам с высокой частотой (более 100 запросов за заданный временной интервал), а SuspiciousPatterns фокусируется на запросах с определенным путем запроса /api/highload.

Когда события от одного источника IP, например, "192.168.1.1", соответствуют обоим фильтрам и проходят через механизм соединения join, правило "DDoSDetectionRule" приступает к группировке. Правило использует поле group для агрегирования событий от одного источника IP. Если количество событий от одного источника соответствует заданному порогу в 3 события, правило генерирует корреляционное событие.

Example 8. Корреляционное событие, генерируемое правилом "DDoSDetectionRule"
{
  "type": "correlation_rule",
  "severity": "high",
  "event_type": "DDoS Detection",
  "source_ip": "192.168.1.1",
  "datetime": "2023-09-28T12:45:00Z",
  "message": "Обнаружена потенциальная DDoS атака с источника: 192.168.1.1",
  "additional_info": "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"
}

Данное корреляционное событие указывает на то, что система обнаружила аномальную активность, соответствующую критериям DDoS-атаки, исходящей с одного IP-адреса. Правило "DDoSDetectionRule" коррелирует события, основываясь на двух ключевых факторах: частоте запросов и специфических путях запросов, что является индикатором потенциальной атаки.

Расширение конфигурации существующего правила

Рассмотрим сценарий, в котором необходимо добавить дополнительные условия фильтрации и корреляции для правила "DDoSDetectionRule", чтобы повысить его эффективность и надежность в обнаружении более сложных DDoS-атак.

Исходное правило корреляции "DDoSDetectionRule", схема RObject
# Уникальный идентификатор правила.
id: example/ddos_detection_rule

# Название правила.
name: DDoSDetectionRule

# Тип элемента экспертизы.
type: correlation_rule

# Версия правила.
version: 1.0.0

# Уровень угрозы, присваиваемый корреляционному правилу.
severity: high

# Описание назначения правила.
description: Обнаружение атаки DDoS на основе трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_13
    - EventID_1

# Теги, связанные с правилом.
tags:
  - ddos
  - security

# Фильтр применяется к событиям перед входом в правило.
filter: !vrl |
  .source_ip != null &&
  .request_path != null

# Разделение потоков событий на псевдонимы.
aliases:
 HighRateTraffic:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100
 SuspiciousPatterns:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload")
 NormalTraffic:
   filter: !vrl |
     .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/home")

# Соединение потоков событий.
select:
  alias: HighRateTraffic
  join:
    alias: SuspiciousPatterns
    absent: true
    on:
      - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip }
    wait_for: 30

# Группировка событий.
group:
  - alias: HighRateTraffic
    by:
      - source_ip
    count: 3

# Частота генерации корреляционных событий в рамках каждой группы (событий в секунду).
throttle_time_sec: 60

# Время хранения промежуточных корреляционных данных (секунды).
ttl: 60

# Логика обогащения корреляционного события на языке VRL.
on_correlate: !vrl |
 .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip)
 .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload"

# Управляет созданием оповещений по результатам корреляции.
trigger_alert: true

# Тесты для проверки работы правила.
tests:
 - name: DDoS Attack Detected
   events:
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.100", "window": "130s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s",  "request_path": "/home" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s",  "request_path": "/home" }
=    - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.224",  "window": "60s",  "request_path": "/contact" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about" }
   assertion: !vrl |
     assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1")

Для этого добавим дополнительный псевдоним с фильтрацией для отбора событий по значению поля user_agent исходного события. Также добавим обращение к активному списку для динамической оценки уровня угрозы, основанной на текущей информации о подозрительных IP-адресах.

Example 9. Пример улучшенного правила "EnhancedDDoSDetectionRule", схема RObject
# Уникальный идентификатор правила.
id: example/ddos_detection_rule

# Название правила.
name: EnhancedDDoSDetectionRule

# Тип элемента экспертизы.
type: correlation_rule

# Версия правила.
version: 1.0.0

# Уровень угрозы, присваиваемый корреляционному правилу.
severity: high

# Описание назначения правила.
description: Расширенное обнаружение DDoS-атак на основе трафика

# Имя и контактные данные автора правила.
author: John Doe <johndoe@example.com>

# Список совместимых с правилом источников данных.
data_source:
  - platform: Windows
  - source: Sysmon
  - events:
    - EventID_13
    - EventID_1

# Теги, связанные с правилом.
tags:
  - ddos
  - security
  - enhanced_detection

# Фильтр применяется к событиям перед входом в правило.
filter: !vrl |
  .source_ip != null &&
  .request_path != null

# Разделение потоков событий на псевдонимы.
aliases:
  HighRateTraffic:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && parse_duration!(.window, "s") >= 100
  SuspiciousPatterns:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && ends_with(to_string!(.request_path), "/api/highload")
  # Дополнительный псевдоним.
  UnusualUserAgents:
    filter: !vrl |
      .event_type == "HTTP_REQUEST" && regex_match("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", .user_agent)

# Соединение потоков событий.
select:
  alias: HighRateTraffic
  join:
    alias: SuspiciousPatterns
    absent: true
    on:
      - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip }
    join:
      alias: UnusualUserAgents
      on:
        - eq: { HighRateTraffic: .user_agent, UnusualUserAgents: .user_agent }
      wait_for: 40

# Группировка событий.
group:
  - alias: HighRateTraffic
    by:
      - source_ip
      - user_agent # Дополнительный критерий фильтрации.
    count: 3

# Частота генерации корреляционных событий в рамках каждой группы (событий в секунду).
throttle_time_sec: 60

# Время жизни промежуточных корреляционных данных (секунды).
ttl: 80

# Логика обогащения корреляционного события на языке VRL.
on_correlate: !vrl |
  .message = "Обнаружена потенциальная DDoS атака с источника: " + to_string!(.source_ip)
  .additional_info = "Высокая частота запросов и подозрительные паттерны в пути запроса /api/highload и необычные user agents"
  # Обращение к активному списку "suspicious_ips"
  if get_active_record("suspicious_ips", .source_ip) != null {
    .threat_level = "high"
  } else {
    .threat_level = "medium"
  }

# Управляет созданием оповещений по результатам корреляции.
trigger_alert: true

# MOCK данные активного списка
setup_tests:
  active_lists:
    suspicious_ips:
      - "192.0.2.1"
      - "198.51.100.10"

# Тесты для проверки работы правила.
tests:
 - name: Enhanced DDoS Attack Detection
   events:
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "130s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "140s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "180s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.132", "window": "50s",  "request_path": "/home" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "50s",  "request_path": "/home" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2",  "window": "60s",  "request_path": "/contact", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "100s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "110s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.2", "window": "120s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "130s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "140s", "request_path": "/api/highload"  }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "150s", "request_path": "/api/highload", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"  }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "160s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "170s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "180s", "request_path": "/api/highload" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.1", "window": "190s", "request_path": "/api/highload" , "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "198.51.100.10", "window": "50s",  "request_path": "/home", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "203.0.113.2",  "window": "60s",  "request_path": "/contact" }
   - { "event_type": "HTTP_REQUEST", "source_ip": "192.0.2.10", "window": "70s", "request_path": "/about", "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }
   assertion: !vrl |
      assert_eq!(.message, "Обнаружена потенциальная DDoS атака с источника: 192.0.2.1")
      assert_eq!(.threat_level, "high")

Таким образом, в улучшенное правило "EnhancedDDoSDetectionRule" внесены следующие изменения:

  1. Дополнительный псевдоним и условие фильтрации:

    • Добавлен псевдоним UnusualUserAgents, в рамках которого происходит фильтрация событий по user_agent для идентификации необычных или подозрительных запросов.

    • Фильтр .event_type == "HTTP_REQUEST" && regex_match("^Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", .user_agent) отбирает события со значением "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в поле user_agent.

    • Добавлен UnusualUserAgents в select с последующей корреляцией по user_agent.

  2. Настройка ttl:

    • Значение ttl установлено в 80 секунд, увеличивая период хранения промежуточных корреляционных данных.

  3. Дополнительный критерий группировки:

    • Добавлена группировка по user_agent, для более точного определения распределенных атак.

  4. Интеграция с активным списком:

    • Использована функция get_active_record("suspicious_ips", .event.source_ip) для обогащения результатов проверки данными из активного списка "suspicious_ips". Это реализовано через условие:

        if get_active_record("suspicious_ips", .source_ip) != null {
          .threat_level = "high"
        } else {
          .threat_level = "medium"
        }

      Таким образом, при генерации корреляционного события, помимо основного сообщения и дополнительной информации, теперь также устанавливается уровень угрозы в зависимости от статуса IP-адреса в активных списках. Уровень угрозы устанавливается в "high", если IP-адрес источника обнаружен в списке подозрительных, и "medium" в остальных случаях.

  5. Тестовая проверка:

    • Добавлен блок setup_tests для предварительной настройки тестового окружения, включающий активный список suspicious_ips с указанными IP-адресами "192.168.1.1" и "10.10.10.10". Это позволяет эмулировать обращение к активному списку во время тестирования правила и проверять логику обогащения событий на основе наличия IP-адреса в списке подозрительных.

    • Добавлены тестовые события со значением "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в поле user_agent, чтобы проверить правило на способность корректно идентифицировать и обрабатывать события с конкретным значением в user_agent.

    • Добавлена проверка динамической оценки уровня угрозы на основе данных из активного списка. Проверяется, каким образом правило реагирует на обнаружение подозрительных IP-адресов и присваивает соответствующий уровень угрозы (high или medium).

Интерпретация

Основной поток (select):

  • alias: HighRateTraffic определяет начальный поток событий для корреляции, который в данном случае представляет поток HighRateTraffic. Поток HighRateTraffic охватывает события, соответствующие определенным критериям, таким как высокая частота HTTP-запросов от одного источника IP.

Первый уровень соединения (join):

  • alias: SuspiciousPatterns указывает на то, что поток HighRateTraffic будет соединяться с потоком SuspiciousPatterns. Поток SuspiciousPatterns фильтрует события на основе определенных паттернов в пути запроса (например, оканчивающихся на "/api/highload").

  • on: - eq: { HighRateTraffic: .source_ip, SuspiciousPatterns: .source_ip } говорит о том, что соединение между потоками HighRateTraffic и SuspiciousPatterns будет осуществляться на основе совпадения значения поля .source_ip. То есть корреляция произойдет, если источник IP в обоих потоках событий совпадает.

  • В операции соединения HighRateTraffic и SuspiciousPatterns используется поле absent: true, что указывает на то, что отсутствие события из SuspiciousPatterns в течение 30 секунд будет добавлено в цепочку корреляционных событий.

Второй уровень соединения (join):

  • alias: UnusualUserAgents добавляет третий поток событий UnusualUserAgents в цепочку корреляции. Этот поток фокусируется на событиях с "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (HTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" в поле user_agent.

  • on: - eq: { HighRateTraffic: .user_agent, UnusualUserAgents: .user_agent } устанавливает, что соединение между HighRateTraffic и UnusualUserAgents осуществляется на основе совпадения по полю .user_agent. Это означает, что для корреляции в рамках данного правила событие должно соответствовать критериям как высокой частоты запросов, так и "подозрительного" user_agent.

Таким образом, создается многоуровневая логика корреляции, где события должны соответствовать критериям всех трех потоков — HighRateTraffic, SuspiciousPatterns и UnusualUserAgents. Это позволяет правилу "EnhancedDDoSDetectionRule" выявлять потенциальные DDoS-атаки, основываясь на более сложных и детализированных критериях, чем в исходной версии правила.