Logstash, Manticore, Nginx и Symfony: сбор, агрегация и быстрый поиск логов

Мысль о централизованном мониторинге логов хранилась в моей голове много лет, но реализовать ее я смог только совсем недавно. Рассказываю о настройке и подводных камнях связки Logstash + Manticore + Nginx + Symfony.

В сети есть достаточно информации на тему связки Logstash + Elasticsearch, но практически отсутствует по Manticore. Да, у самой Мантикоры есть даже целый интерактивный курс по Logstash/Beats, но как все хорошо это работает на демке и как может быть сложно развернуть все это на продакшене — никто не рассказывает, а точнее о проблемах, которые могут встретиться на пути. Об этом и напишу.

Дисклеймер: подразумевается, что читатель знаком с ОС Linux и имеет базовые навыки администрирования, конкретно в данном примере используется Ubuntu 22. Приведенный далее код не является «вылизанным», местами может быть не оптимальным по мнению читателя, задача статьи показать пример, дать рабочую схему, а заняться оптимизацией любой желающий сможет в дальнейшем самостоятельно.

Установка и настройка Manticore

В самой установке Manticore нет ничего сложного и необычного — просто следуем инструкции с официального сайта, скачиваем и устанавливаем пакет:

wget https://repo.manticoresearch.com/manticore-repo.noarch.deb
dpkg -i manticore-repo.noarch.deb
apt update
apt install manticore manticore-extra

Спасибо кэп, как говорится. Открываем и смотрим в конфигурационный файл:

nano /etc/manticoresearch/manticore.conf

Видим такую картину:

searchd {
  listen = 127.0.0.1:9312
  listen = 127.0.0.1:9306:mysql
  listen = 127.0.0.1:9308:http
  log = /var/log/manticore/searchd.log
  query_log = /var/log/manticore/query.log
  pid_file = /var/run/manticore/searchd.pid
  data_dir = /var/lib/manticore
}

Здесь нужно обратить внимание на одну вещь: сам поисковой движок у нас будет один, база, в которую будут писаться все логи, будет так же одна, но хостов/серверов/VPS у нас может быть любое количество, на каждом из них будет крутиться Logstash, собирая логи и передавая их в одну базу Manticore (можно и не в одну, но здесь все упрощаем для понимания).

Часто (но не всегда) разные хосты располагаются в одной локальной сети, поэтому вместо локального адреса 127.0.0.1 мы указываем локальный приватный адрес хоста, на котором крутится Мантикора, например 10.16.0.1. Таким образом мы делаем Мантикору доступной для подключения других наших хостов из этой локальной сети:

searchd {
  listen = 10.16.0.1:9312
  listen = 10.16.0.1:9306:mysql
  listen = 10.16.0.1:9308:http
  log = /var/log/manticore/searchd.log
  query_log = /var/log/manticore/query.log
  pid_file = /var/run/manticore/searchd.pid
  data_dir = /var/lib/manticore
}

Теперь на хосте 10.16.0.1 и порту 9308 у нас крутится Мантикора и принимает к себе входящие HTTP-запросы, которые могут отправлять хосты, например 10.16.0.2,  10.16.0.3 и так далее. Можно использовать и внешний IP адрес, но в этом случае потребуется дополнительно озаботиться безопасностью открытого во внешний мир порта.

Наконец запускаем Мантикору:

systemctl start manticore

Потыкайте в нее чем-нибудь, например так:

curl 10.16.0.1:9308

Если все хорошо, вы увидите примерно следующий ответ:

{"cluster_name":"docker-cluster","cluster_uuid":"Z7igA6xDRDKCVwnMuyXCOQ","name":"4e9d933ebde2","tagline":"You Know, for Search","version":{"build_date":"2019-10-22T17:16:35.176724Z","build_flavor":"default","build_hash":"fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e","build_snapshot":false,"build_type":"docker","lucene_version":"8.2.0","minimum_index_compatibility_version":"6.0.0-beta1","minimum_wire_compatibility_version":"6.8.0","number":"7.4.1"}}

В целом, с особенностями настройки Manticore все, каких-либо других трудностей встречено не было. Они будут дальше, с самим Logstash-ем.

Подготовка логов Nginx

О важности чтения Nginx логов говорить не будем, перейдем сразу к подготовке формата логов — отлично подойдет JSON. Можно конечно использовать и строчный вариант, а затем парсить это через Grok (к нему мы еще вернемся), но зачем, если есть удобный JSON?

Предположим, что конфигурационный файл вашего Nginx располагается по адресу /etc/nginx/nginx.conf — открываем его и вкручиваем JSON-формат:

nano /etc/nginx/nginx.conf 

— в блок раздела http добавляем:

log_format main_json escape=json '{'
    '"datetime_local": "http_host", '
    '"ip": "request_method", '
    '"uri": "args", '
    '"post": "http_cookie", '
    '"status": http_user_agent", '
    '"req_time": http_referer" '
'}';

Вы можете добавить свои данные по вкусу, но в таком случае не забудьте их правильно «разметить» в конфигурационном файле Logstash (об этом чуть позже).

Далее перемещаемся в наш хост и указываем ему писать логи в формате «main_json», который мы добавили выше:

nano /etc/nginx/sites-available

— в блоке server меняем или добавляем:

access_log /var/log/nginx/access_json.log main_json;

Помимо access log у нас так же есть и error log — у него формат остается родной, поэтому так же добавляем строку:

error_log /var/log/nginx/error.log

После всех изменений перезапускаем Nginx:

systemctl restart nginx

Если все хорошо, логи начнут писаться по адресу »/var/log/nginx/access_json.log» и »/var/log/nginx/error.log» — уже есть что собирать, если нет — смотрим ошибки, на которые укажет информация в консоли и исправляем.

Подготовка логов Symfony

Если у вас другой рабочий стек и логи фреймворка вам не требуются — этот раздел можно пропустить.

Symfony позволяет настроить свои логи через Monolog Bundle — мы можем просто сразу указать ему формат JSON, но это не очень полезно ввиду ограниченности записываемой информации, поэтому напишем свой вариант JSON, включив в него нужную нам информацию. Конкретно этот пример будет рассмотрен для 6 версии Symfony. Все пути будут указаны относительно папки вашего проекта.

Открываем конфиг логгера:

nano config/packages/monolog.yaml

— и вкручиваем ему следующую информацию:

monolog:
  handlers:
    main:
      type: fingers_crossed
      action_level: error
      handler: nested
      excluded_http_codes: [400, 403, 404]
      buffer_size: 50
      level: debug
    nested:
      action_level: error
      type: rotating_file
      path: "%kernel.logs_dir%/%kernel.environment%.json"
      level: debug
      formatter: monolog.formatter.custom
      max_files: 7
    console:
      type: console
      process_psr_3_messages: false
      channels: ["!event", "!doctrine"]

Исключаем коды ответов 400, 403, 404 — не интересно, уровень срабатывания записи лога от «error», ротация файлов по 7 штук. Все параметры можно настроить по своему вкусу, ключевым здесь является строка:

formatter: monolog.formatter.custom_json

— в ней мы указываем название нашего сервиса-обработчика логов, а так же:

path: "%kernel.logs_dir%/%kernel.environment%.json"

— путь хранения логов. Далее создаем соответствующий сервис:

nano config/services.yaml

В блок «services» добавляем:

monolog.formatter.custom_json:
    class: App\Service\CustomJsonFormatter

Ну и наконец — создаем сам обработчик по адресу «src/Service/CustomLogFormatter.php»:

systemName = $systemName === "" ? (string) gethostname() : $systemName;
        $this->applicationName = $applicationName;
        $this->extraKey = $extraKey;
        $this->contextKey = $contextKey;
    }

    /**
     * @inheritDoc
     */
    public function format(LogRecord $record): string
    {
        $recordData = parent::format($record);
        $cDate = new \DateTime();

        // собираем все интересующие нас данные в массив
        $message = [
            'created_at' => $cDate->format('Y-m-d H:i:s'),
            'datetime' => $cDate->getTimestamp(),
            'http_host' => isset($_SERVER['HTTP_HOST']) && $_SERVER['HTTP_HOST'] ? $_SERVER['HTTP_HOST'] : "",
            'uri' => isset($_SERVER['REQUEST_URI']) && $_SERVER['REQUEST_URI'] ? $_SERVER['REQUEST_URI'] : "",
            'get' => $_GET ? http_build_query($_GET) : "",
            'post' => $_POST ? http_build_query($_POST) : "",
            'cookie' => "",
            'http_referer' => isset($_SERVER['HTTP_REFERER']) && $_SERVER['HTTP_REFERER'] ? $_SERVER['HTTP_REFERER'] : "",
            'user_agent' => isset($_SERVER['HTTP_USER_AGENT']) && $_SERVER['HTTP_USER_AGENT'] ? $_SERVER['HTTP_USER_AGENT'] : "",
            'ip' => Util::getUserIpAddr(),
            'method' => isset($_SERVER['REQUEST_METHOD']) && $_SERVER['REQUEST_METHOD'] ? $_SERVER['REQUEST_METHOD'] : "",
            'req_time' => (float) (isset($_SERVER['REQUEST_TIME_FLOAT']) && $_SERVER['REQUEST_TIME_FLOAT'] ? number_format(microtime(true) - $_SERVER["REQUEST_TIME_FLOAT"], 2, '.', '') : 0.00),
            'status' => 500,
        ];

        // сопоставляем тип ошибки и получаем ее шорт код, чтобы по нему в дальнейшем проводить фильтрацию данных
        $levels = [
            'DEBUG' => [
                'code' => 100,
                'short_code' => 1,
                'description' => 'Detailed debug information',
                'color' => '#afafaf',
            ],
            'INFO' => [
                'code' => 200,
                'short_code' => 2,
                'description' => 'Interesting events. Examples: User logs in, SQL logs',
                'color' => '#9dc3ff',
            ],
            'NOTICE' => [
                'code' => 250,
                'short_code' => 3,
                'description' => 'Normal but significant events',
                'color' => '#a19dff',
            ],
            'WARNING' => [
                'code' => 300,
                'short_code' => 4,
                'description' => 'Exceptional occurrences that are not errors. Examples: Use of deprecated APIs, poor use of an API, undesirable things that are not necessarily wrong',
                'color' => '#dfad00',
            ],
            'ERROR' => [
                'code' => 400,
                'short_code' => 5,
                'description' => 'Runtime errors that do not require immediate action but should typically be logged and monitored',
                'color' => '#df5a00',
            ],
            'CRITICAL' => [
                'code' => 500,
                'short_code' => 6,
                'description' => 'Critical conditions. Example: Application component unavailable, unexpected exception',
                'color' => '#ff0000',
            ],
            'ALERT' => [
                'code' => 550,
                'short_code' => 7,
                'description' => 'Action must be taken immediately. Example: Entire website down, database unavailable, etc. This should trigger the SMS alerts and wake you up',
                'color' => '#ff00be',
            ],
            'EMERGENCY' => [
                'code' => 600,
                'short_code' => 8,
                'description' => 'Emergency: system is unusable',
                'color' => '#010001',
            ],
        ];

        // записываем остальные полезные данные
        $message['description'] = isset($recordData['message']) ? $recordData['message'] : "";
        $message['user_id'] = (int) isset($GLOBALS['USER']) ? $GLOBALS['USER']['id'] : 0;
        $message['level'] = (int) (isset($recordData['level_name']) ? (array_key_exists($recordData['level_name'], $levels) ? $levels[$recordData['level_name']]['short_code'] : 6) : 6);

        if ($recordData['context']) {
            $message['description'] .= 'Trace: '.json_encode($recordData['context']);
        }

        // конвертируем в JSON и пишем в файл
        return $this->toJson($message) . "\n";
    }
}

Фактически мы берем родной Logstash-евский файл от Monolog бандла «vendor/monolog/monolog/src/Monolog/Formatter/LogstashFormatter.php» и дорабатываем его под свои нужды.

В данном коде отдельное внимание необходимо уделить типу данных: так как PHP у нас не строго типизированный язык и позволяет на ходу «переобуваться», Manticore с Logstash будут требовать от нас данных одного типа, строка должна быть строкой, а число числом, проще говоря 1!= »1» — это очень важный нюанс, из-за которого и всплывает большинство проблем в дальнейшем.

Чистим кэш (для среды dev, если ведем разработку, prod соответственно для продакшена):

bin/console cache:clear --env=dev

Создаем какую-нибудь ошибку в коде и любуемся наличием логов по адресу (prod соответственно для продакшена):

var/log/dev-2024-01-17.json

Настройка Logstash

Наконец, гвоздь программы: Logstash + Manticore. Если вы «игрались» в Logstash на интерактивном курсе на сайте Мантикоры, вы могли заметить, что версия Logstash в нем 7.10.0. В относительно недавном обновлении Manticore (с 6.0.4) мы видим такую строчку: «Support for Logstash versions >= 7.13». Следовательно, идем на сайт ELK, в раздел Logstash и качаем (скорее всего понадобится VPN) версию 8.6.2 Более поздние версии мной лично не тестировались, поэтому за их работоспособность не ручаюсь. Перемещаем все это дело на наш сервер, распаковываем и погружаемся в папку Logstash.

Если вы пропустили часть про Symfony, я повторюсь:  Manticore с Logstash будут требовать от нас данных одного типа, строка должна быть строкой, а число числом, проще говоря 1!= »1» — это очень важный нюанс, из-за которого и всплывает большинство проблем в дальнейшем.

Создаем конфигурационный файл Logstash:

nano dev.conf

— и пишем в него следующее (разобъем на 3 части: input, filter, output):

input {
    file {
        path => "/var/log/nginx/access_json.log" # путь до access логов Nginx
        codec => "json" # указываем формат лога
        type => 1 # указываем тип для удобства разграничения в блоке filter
    }

    file {
        path => "/var/log/nginx/error.log" # путь до error логов Nginx
        type => 2 # указываем тип для удобства разграничения в блоке filter
    }

    file {
        path => ["/var/log/*.json"] # путь до логов вашего Symfony проекта
        codec => "json" # указываем формат лога
        type => 3 # указываем тип для удобства разграничения в блоке filter
    }
}

Параметр type, насколько мне известно, является устаревшим, но раз работает, будем его использовать. Кто сильно против, можно заменить на конструкцию вида:

add_field => {
    "type" => 1
}

Далее сразу заполним блок output, так как в нем по сути тоже ничего интересного:

output {
    elasticsearch {
        index => "logs"
        hosts => ["http://10.16.0.1:9308"]
        ilm_enabled => false
        manage_template => false
    }
}

Здесь обратить внимание следует только на название таблицы logs и адрес 10.16.0.1. Важность установки корректного адреса мы рассмтаривали в первом блоке при настройке Manticore.

Переходим к самому главному блоку — filter. Будем рассматривать отдельно по каждому из type, начнем с первого:

if [type] == 1 {

    # добавляем поля
    mutate {
        add_field => {
            "level" => 2 # обозначаем уровень лога
            "description" => "" # добавляем пустое поле description для однородности данных
        }
    }

    # парсим из access json user_agent и преобразуем его в поле useragent в формате json
    useragent {
        source => "user_agent"
        target => "useragent"
    }

    # преобразуем данные в нужный нам формат при помощи Ruby
    ruby {
        code => '
            # преобразуем дату - одну в формат 2024-01-01 00:00:00, другую в Timestamp
            time = Time.strptime(event.get("datetime_local"), "%d/%b/%Y:%H:%M:%S")
            event.set("created_at", time.strftime("%Y-%m-%d %H:%M:%S"))
            event.set("datetime", time.to_time.to_i)

            # Получаем ID юзера из куки (опционально)
            # здесь ваше приложение может как писать просто user_id в отдельную куку, так и можно замешать ее в хэш
            # и расшифровать соответственно через Ruby
            user_id = 0
            id = event.get("cookie").split("user_id")
            if id[1]
                id = id[1].split(";")
                if id[0]
                    user_id = id[0]
                end
            end

            # обязательно преобразуем полученный user_id в число
            event.set("user_id", user_id.to_i)

            # Вырезаем лишние данные из кук (опционально)
            # например токен авторизованного пользователя и другую информацию, чтобы она не гуляла в открытом доступе в вашей базе Manticore
            newArr = []
            arr = event.get("cookie").split(";")
            if (arr)
                for v in arr do
                    if (v.index("user_token="))
                        newArr.append("user_token=********")
                    else
                        newArr.append(v.strip)
                    end
                end

                # собираем все обратно в строку
                event.set("cookie", newArr.join("; "))
            end

            # Вырезаем пароли из POST-параметров (опционально)
            # при авторизации/регистрации пользователи будут вводить свои пароли - так же вырежем их из наши логов
            newArr = []
            arr = event.get("post").split("&")
            if (arr)
                for v in arr do
                    if (v.index("password="))
                        newArr.append("password=********")
                    else
                        newArr.append(v.strip)
                    end
                end

                # собираем все обратно в строку
                event.set("post", newArr.join("&"))
            end
        '
    }
}

Снова сделаю оговорку — здесь я впервые писал на Ruby, поэтому на качество не претендую, нам важна суть, кто сделает лучше — тот молодец. Далее переходим ко второй части — парсинг Nginx error log:

if [type] == 2 {

    # парсим error log при помощи grok-а
    grok {
        match => { "message" => "%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME:time} \[%{LOGLEVEL:level}\] %{GREEDYDATA:description}" } # задаем шаблон парсинга файла логов
        add_field => {
            "created_at" => "%{year}-%{month}-%{day} %{time}" # добавляем поле с датой в нужном формате
            "uri" => "" # добавляем заглушку поля uri
            "ip" => "" # добавляем заглушку поля ip
        }
        remove_field => ["year", "month", "day", "time", "message"] # сносим лишние поля
    }

    # преобразуем данные в нужный нам формат при помощи Ruby
    ruby {
        code => '
            # преобразуем дату в Timestamp
            time = Time.strptime(event.get("created_at"), "%Y-%m-%d %H:%M:%S")
            event.set("datetime", time.to_time.to_i)

            # обозначаем уровень лога
            levelNum = 6
            levelName = event.get("level")
            if (levelName == "debug")
                levelNum = 1
            elsif (levelName == "info")
                levelNum = 2
            elsif (levelName == "notice")
                levelNum = 3
            elsif (levelName == "warn")
                levelNum = 4
            elsif (levelName == "error")
                levelNum = 5
            elsif (levelName == "crit")
                levelNum = 6
            elsif (levelName == "alert")
                levelNum = 7
            elsif (levelName == "emerg")
                levelNum = 8
            end

            # если это касается SSL_do_handshake - перекидываем в тип "warn" (опционально)
            # подобным образом можно вручную переопределить тип нужной ошибки
            textWarning = "SSL_do_handshake() failed"
            if (event.get("description").index(textWarning))
                levelNum = 4
            end

            # устанавливаем полученный тип ошибки
            event.set("level", levelNum.to_i)
        '
    }
}

Потренироваться в написании правил парсинга логов с помощью Grok вы можете по ссылке:  https://grokdebugger.com Далее переходим к 3 части — парсинг логов Symfony:

if [type] == 3 {
    useragent {
      source => "user_agent"
      target => "useragent"
    }
}

Так как мы изначально собрали логи в парвильный формат, особых изысков здесь не требуется. Наконец, добавляем заключительную общую часть:

# добавляем поле, определяющее что именно отдал Nginx - файл или результат запроса
mutate {
    add_field => {
        "is_static" => 0
    }
}

# преобразуем данные в нужный нам формат при помощи Ruby
ruby {
    code => '
        # получаем URL-адрес запрашиваемого ресурса
        url = event.get("uri")

        isStatic = 0

        # если в URL-е есть такие пути, то ставим отметку о том, что это файл
        # вы можете сделать свои условия, например по расширению файла .jpg, .png, etc
        if (url.index("/images/") or url.index("/css/") or url.index("/js/"))
            isStatic = 1
        end

        # записываем значение поля
        event.set("is_static", isStatic)

        # получаем IP адрес пользователя
        ipVal = event.get("ip")
        if (ipVal.length > 0)
            ipIntVal = IPAddr.new(ipVal).to_i
            if (ipIntVal)
                event.set("ip_integer", ipIntVal)
            end
        end
    '
}

# здесь мы выпиливаем все ненужные нам поля и дополнительно-приндительно повторно преобразуем некоторые из них в числовой тип
mutate {
    remove_field => ["@version", "@timestamp", "log", "host", "datetime_local", "user_agent", "http_host", "created_at"]
    convert => {
        "datetime" =>"integer"
        "status" => "integer"
        "type" => "integer"
        "user_id" => "integer"
        "level" => "integer"
        "is_static" => "integer"
        "ip_integer" => "integer"
    }
}

В итоге наша табличка logs получит следующий вид (desc logs):

+--------------+--------+----------------+
| Field        | Type   | Properties     |
+--------------+--------+----------------+
| id           | bigint |                |
| method       | text   | indexed stored |
| post         | text   | indexed stored |
| description  | text   | indexed stored |
| get          | text   | indexed stored |
| uri          | text   | indexed stored |
| cookie       | text   | indexed stored |
| ip           | text   | indexed stored |
| http_referer | text   | indexed stored |
| datetime     | uint   |                |
| user_id      | uint   |                |
| is_static    | uint   |                |
| level        | uint   |                |
| type         | uint   |                |
| status       | uint   |                |
| req_time     | float  |                |
| event        | json   |                |
| useragent    | json   |                |
| ip_integer   | bigint |                |
+--------------+--------+----------------+

Для подключения к Мантикоре через консоль подходит стандартная команда:

mysql --host="10.16.0.1" -P9306

Чтобы нам протестировать корректность конфигурации и парсинга логов добавим в блок input каждому типу следующие строки:

start_position => "beginning"
sincedb_path => "/dev/null"
mode => "read"
exit_after_read => "true"
file_completed_action => "log"
file_completed_log_path => "/dev/null"

— это позволит считать все имеющиеся логи по указанным путям при запуске команды (в prod-версии соответственно эти строки нужно будет убрать и логи будут отправляться в базу Мантикоры пачками по мере их создания). В итоге полный конфиг dev.conf будет таким:

iinput {
    file {
        path => "/var/log/nginx/access_json.log"
        codec => "json"
        type => 1

        start_position => "beginning"
        sincedb_path => "/dev/null"
        mode => "read"
        exit_after_read => "true"
        file_completed_action => "log"
        file_completed_log_path => "/dev/null"
    }

    file {
        path => "/var/log/nginx/error.log"
        type => 2

        start_position => "beginning"
        sincedb_path => "/dev/null"
        mode => "read"
        exit_after_read => "true"
        file_completed_action => "log"
        file_completed_log_path => "/dev/null"
    }

    file {
        path => "/var/log/*.json"
        codec => "json"
        type => 3

        start_position => "beginning"
        sincedb_path => "/dev/null"
        mode => "read"
        exit_after_read => "true"
        file_completed_action => "log"
        file_completed_log_path => "/dev/null"
    }
}

filter {
    if [type] == 1 {
        mutate {
            add_field => {
                "level" => 2
                "description" => ""
            }
        }


        useragent {
            source => "user_agent"
            target => "useragent"
        }


        ruby {
            code => '
                time = Time.strptime(event.get("datetime_local"), "%d/%b/%Y:%H:%M:%S")
                event.set("created_at", time.strftime("%Y-%m-%d %H:%M:%S"))
                event.set("datetime", time.to_time.to_i)

                user_id = 0
                id = event.get("cookie").split("user_id")
                if id[1]
                    id = id[1].split(";")
                    if id[0]
                        user_id = id[0]
                    end
                end


                event.set("user_id", user_id.to_i)

                newArr = []
                arr = event.get("cookie").split(";")
                if (arr)
                    for v in arr do
                        if (v.index("user_token="))
                            newArr.append("user_token=********")
                        else
                            newArr.append(v.strip)
                        end
                    end

                    event.set("cookie", newArr.join("; "))
                end

                newArr = []
                arr = event.get("post").split("&")
                if (arr)
                    for v in arr do
                        if (v.index("password="))
                            newArr.append("password=********")
                        else
                            newArr.append(v.strip)
                        end
                    end

                    event.set("post", newArr.join("&"))
                end
            '
        }
    } else if [type] == 2 {
        grok {
            match => { "message" => "%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME:time} \[%{LOGLEVEL:level}\] %{GREEDYDATA:description}" }
            add_field => {
                "created_at" => "%{year}-%{month}-%{day} %{time}"
                "uri" => ""
                "ip" => ""
            }
            remove_field => ["year", "month", "day", "time", "message"]
        }


        ruby {
            code => '
                time = Time.strptime(event.get("created_at"), "%Y-%m-%d %H:%M:%S")
                event.set("datetime", time.to_time.to_i)

                levelNum = 6
                levelName = event.get("level")
                if (levelName == "debug")
                    levelNum = 1
                elsif (levelName == "info")
                    levelNum = 2
                elsif (levelName == "notice")
                    levelNum = 3
                elsif (levelName == "warn")
                    levelNum = 4
                elsif (levelName == "error")
                    levelNum = 5
                elsif (levelName == "crit")
                    levelNum = 6
                elsif (levelName == "alert")
                    levelNum = 7
                elsif (levelName == "emerg")
                    levelNum = 8
                end

                textWarning = "SSL_do_handshake() failed"
                if (event.get("description").index(textWarning))
                    levelNum = 4
                end

                event.set("level", levelNum.to_i)
            '
        }
    } else if [type] == 3 {
        useragent {
          source => "user_agent"
          target => "useragent"
        }
    }

    mutate {
        add_field => {
            "is_static" => 0
        }
    }

    ruby {
        code => '
            url = event.get("uri")
            isStatic = 0
            if (url.index("/images/") or url.index("/css/") or url.index("/js/"))
                isStatic = 1
            end
            event.set("is_static", isStatic)

            ipVal = event.get("ip")
            if (ipVal.length > 0)
                ipIntVal = IPAddr.new(ipVal).to_i
                if (ipIntVal)
                    event.set("ip_integer", ipIntVal)
                end
            end
        '
    }

    mutate {
        remove_field => ["@version", "@timestamp", "log", "host", "datetime_local", "user_agent", "http_host", "created_at"]
        convert => {
            "datetime" =>"integer"
            "status" => "integer"
            "type" => "integer"
            "user_id" => "integer"
            "level" => "integer"
            "is_static" => "integer"
            "ip_integer" => "integer"
        }
    }
}

output {
    elasticsearch {
        index => "logs"
        hosts => ["http://10.16.0.1:9308"]
        ilm_enabled => false
        manage_template => false
    }
}

Запуск Logstash

Наконец, у нас все готово для запуска — дергаем команду:

bin/logstash -f config/dev.conf

Если все прошло без ошибок и команда завершилась успешно или с мелкими варнингами — поздравляю, у вас все получилось! Если вы увидели что-то вроде:

[ERROR][logstash.outputs.elasticsearch][main][695742b1badd1aa1190ffe91d9c5a02b53122cecdca3eed9bddd3b04385a54f] An unknown error occurred sending a bulk request to Elasticsearch (will retry indefinitely) {:message=> "no implicit conversion of String into Integer", :exception=>TypeError, :backtrace=>["org/jruby/RubyArray.java:1545:in `[]'"

— добро пожаловать на танцы с бубнами, где-то у вас перекос в данных. Logstash «щедр» на предоставление конкретной информации, в каких именно данных произошла ошибка, поэтому эти данные нам нужно увидеть явно, для этого запустите ту же команду, но с параметром debug:

bin/logstash -f config/dev.conf --debug

— увидите много интересного, однако искать эти самые различающиеся данные придется вручную, например отследив момент в консоли, с которого посыпались ошибки. Это может быть строка вместо числа или массив вместо строки, либо другие варианты. Ваша задача привести все данные логов к одному виду — тогда Logstash их успешно провернет и запихает в Manticore.

Вместо debug параметра вы можете использовать вывод данных из Logstash-а прямо в консоль в формате JSON — это несколько упростит задачу, для этого замените блок output:

output {
    stdout { codec => json }
}

Так же вы можете поймать другую проблему:

[2024-01-10T16:16:14,858][INFO ][logstash.outputs.elasticsearch][main][aa0103e00e254d1acee990db534e801471ed4f45439353671c76bd5c036b126d] Failed to perform request {:message=>"Broken pipe", :exception=>Manticore::SocketException, :cause=>#}
[2024-01-10T16:16:14,860][WARN ][logstash.outputs.elasticsearch][main][aa0103e00e254d1acee990db534e801471ed4f45439353671c76bd5c036b126d] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] Elasticsearch Unreachable: [http://10.16.0.1:9308/_bulk][Manticore::SocketException] Broken pipe {:url=>http://10.16.0.1:9308/, :error_message=>"Elasticsearch Unreachable: [http://10.16.0.1:9308/_bulk [Manticore::SocketException] Broken pipe", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}
[2024-01-10T16:16:14,861][ERROR][logstash.outputs.elasticsearch][main][aa0103e00e254d1acee990db534e801471ed4f45439353671c76bd5c036b126d] Attempted to send a bulk request but Elasticsearch appears to be unreachable or down {:message=>"Elasticsearch Unreachable: [http://10.16.0.1:9308/_bulk][Manticore::SocketException] Broken pipe", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :will_retry_in_seconds=>4}

Logstash ругается на то, что ваш URL, куда он должен передавать логи 10.16.0.1:9308,  умер! А вы дергаете его через CURL и видите, что он жив. Здесь проблема кроется в отсутствии, либо по каким-то причинам недоступности Manticore Buddy, который поставляется вместе с manticore-extra. Открываем лог:

/var/log/manticore/searchd.log

— и читаем, что не так. В моем случае, по неведомой причине, часть manticore-extra слетела, что сделало этот самый Buddy недоступным для работы, на что нам и жаловался Logstash. Фиксится это повторной установкой manticore-extra:

apt install manticore manticore-extra

— и перезапуском Мантикоры:

systemctl restart manticore

Еще одним сюрпризом может стать логгирование файлов в Nginx — вы же передаете в логи данные массива POST, следовательно в него могут попасть и файлы, если на то есть разрешение у Nginx, например загруженные через AJAX картинки, что во-первых раздует лог до ненормальных размеров, а во-вторых повесит и Logstash — не привык он с картинками работать. Чтобы нам выпилить файлы из нашего Nginx Access JSON лога можно добавить в конфигурацию хоста например такую конструкцию:

set $isFile 'no';
if ($request_body ~* (--WebKitFormBoundary) ) {
    set $isFile 'yes';
}

if ($isFile = 'no') {
    access_log /var/log/nginx/project_access__admin.log main_json;
}

Таким образом, все данные отправленной формы, имеющие в своем содержимом »--WebKitFormBoundary» не будут писаться в наш JSON-лог. Опять же здесь все индивидуально, вы можете настроить работу с файлами либо на стороне Nginx, либо делать это уже в блоке filter в самом Logstash.

И еще пара полезных команд. Запуск Logstash в фоне:

bin/logstash -f config/dev.conf > /dev/null 2>&1 &

Принудительная остановка всех процессов Logstash:

kill -9 $(pgrep -f logstash)

Отображение логов: Kibana, Grafana?

Когда все процедуры сделаны и логи успешно пишутся в нашу табличку, дело остается за малым — подключить UI, для возможности удобного просмотра и фильтрации этих самых логов. Мне готовые решения не нравятся по ряду причин, поэтому я написал свое:

Свой вариант UI для отображения логов из Manticore, собранных через Logstash

Свой вариант UI для отображения логов из Manticore, собранных через Logstash

Увы не публичное, чисто свой велосипед для внутренних нужд.

Работать с Manticore удобно — он поддерживает обычный SQL-синтаксис, практически большинство нужных функций, что дает возможность получать любые нужные данные и интерпретировать их согласно требованиям.

В своем велосипеде я вижу ровно то, что мне важно видеть, но вы можете использовать готовые инструменты для визуализации логов, такие как Kibana или Grafana, дело вкуса. Информации по этим системам в интернете достаточно, поэтому здесь я не буду детально описывать весь процесс их настройки.

Пользуясь случаем хочу предложить всем заинтересованным лицам свою партнерскую программу сервиса APInita.ru — вся вышеописанная схема была развернута в этом проекте. Если очень кратко об условиях партнерства — приводите клиента, помогаете настроить интеграцию и получаете 30% от всех его платежей, пока клиент пользуется сервисом.

Надеюсь был полезен, всем спасибо! Если будут вопросы — по возможности отвечу в комментариях.

© Habrahabr.ru