[Из песочницы] Централизованные логи для приложений с помощью связки heka+elasticsearch+kibana

В статье описана настройка центрального логирования для разных типов приложений (Python, Java (java.util.logging), Go, bash) с помощью довольно нового проекта Heka.Heka разрабатывается в Mozilla и написана на Go. Именно поэтому я использую его вместо logstash, который имеет сходные возможности.Heka основан на плагинах, которые имеют пять типов:

Входные — каким-то образом принимают данные (слушает порты, читают файлы и др.); Декодеры — обрабатывают входящие запросы и переводят их во внутренние структуры для сообщений; Фильтры — производят какие либо действия с сообщениями; Encoders (неясно, как переводить) — кодируют внутренние сообщения в форматы, которые отправляются через выходные плагины; Выходные — отправляют куда-либо данные. Например, в случае Java-приложений входным плагином является LogstreamerInput, который смотрит за изменениями в файле логов. Новые строки в логе обрабатываются декодером PayloadRegexDecoder (по заданному формату) и дальше отправляются в elasticsearch через выходной плагин ElasticSearchOutput. Выходной плагин в свою очередь кодирует сообщение из внутренней структуры в формат elasticsearch через ESJsonEncoder.Установка HekaВсе способы установки описаны на сайте (http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries). Но проще всего скачать готовый бинарный пакет под свою систему со страницы https://github.com/mozilla-services/heka/releases.Поскольку у меня используются сервера под ubuntu, то и описание будет для этой системы. В данном случае установка сводится к установке самого deb пакета, настройке файла конфигурации /etc/hekad.toml и добавления в сервисы upstart.

В базовую настройку /etc/hekad.toml у меня входит настройка количества процессов (я ставлю равным количеству ядер), dashboard (в котором можно посмотреть какие плагины включены) и udp сервер на 5565 порту, который ожидает сообщения по протоколу google protobuf (используется для python и go приложений):

maxprocs = 4

[Dashboard] type = «DashboardOutput» address = »:4352» ticker_interval = 15

[UdpInput] address = »127.0.0.1:5565» parser_type = «message.proto» decoder = «ProtobufDecoder» Конфигурация для upstart /etc/init/hekad.conf: start on runlevel [2345] respawn exec /usr/bin/hekad -config=/etc/hekad.toml Логирование Python приложений Здесь используется библиотека https://github.com/kalail/heka-py и специальный хендлер для модуля logging. Код: import logging from traceback import format_exception

try: import heka HEKA_LEVELS = { logging.CRITICAL: heka.severity.CRITICAL, logging.ERROR: heka.severity.ERROR, logging.WARNING: heka.severity.WARNING, logging.INFO: heka.severity.INFORMATIONAL, logging.DEBUG: heka.severity.DEBUG, logging.NOTSET: heka.severity.NOTICE, } except ImportError: heka = None

class HekaHandler (logging.Handler): _notified = None conn = None host = '127.0.0.1:5565'

def __init__(self, name, host=None): if host is not None: self.host = host

self.name = name super (HekaHandler, self).__init__()

def emit (self, record): if heka is None: return

fields = { 'Message': record.getMessage (), 'LineNo': record.lineno, 'Filename': record.filename, 'Logger': record.name, 'Pid': record.process, 'Exception': '', 'Traceback': '', }

if record.exc_info: trace = format_exception (*record.exc_info) fields['Exception'] = trace[-1].strip () fields['Traceback'] = ''.join (trace[:-1]).strip ()

msg = heka.Message ( type=self.name, severity=HEKA_LEVELS[record.levelno], fields=fields, )

try: if self.conn is None: self.__class__.conn = heka.HekaConnection (self.host)

self.conn.send_message (msg) except: if self.__class__._notified is None: print «Sending HEKA message failed» self.__class__._notified = True По умолчанию хендлер ожидает, что Heka слушает на порту 5565.Логирование Go приложений Для логирования я форкнул библиотеку для логирования https://github.com/ivpusic/golog и добавил туда возможность отправки сообщений прямо в Heka. Результат расположен здесь: https://github.com/ildus/gologИспользование:

import «github.com/ildus/golog» import «github.com/ildus/golog/appenders» … logger:= golog.Default logger.Enable (appenders.Heka (golog.Conf{ «addr»:»127.0.0.1», «proto»: «udp», «env_version»:»2», «message_type»: «myserver.log», })) … logger.Debug («some message») Логирование Java приложений Для Java приложений используется входной плагин типа LogstreamerInput с специальным regexp декодером. Он читает логи приложения из файлов, которые должны быть записаны в определенном формате.Конфигурация для heka, отвечающая за чтение и декодирование логов:

[JLogs] type = «LogstreamerInput» log_directory = »/some/path/to/logs» file_match = 'app_(? P\d+\.\d+)\.log' decoder = «JDecoder» priority = [«Seq»]

[JDecoder] type = «PayloadRegexDecoder» #Parses com.asdf[INFO|main|2014–01–01 3:08:06]: Server started match_regex = '^(? P[\w\.]+)\[(? P[A-Z]+)\|(? P[\w\d\-]+)\|(? P[\d\-\s:]+)\]: (? P.*)' timestamp_layout = »2006–01–02 15:04:05» timestamp_location = «Europe/Moscow»

[JDecoder.severity_map] SEVERE = 3 WARNING = 4 INFO = 6 CONFIG = 6 FINE = 6 FINER = 7 FINEST = 7

[JDecoder.message_fields] Type = «myserver.log» Message = »%Message%» Logger = »%LoggerName%» Thread = »%Thread%» Payload = » В приложении надо изменить Formatter через logging.properties. Пример logging.properties:

handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler

java.util.logging.FileHandler.level=ALL java.util.logging.FileHandler.pattern = logs/app_%g.%u.log java.util.logging.FileHandler.limit = 1024000 java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter java.util.logging.FileHandler.append=tru

java.util.logging.ConsoleHandler.level=ALL java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter Код BriefLogFormatter:

package com.asdf;

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.logging.Formatter; import java.util.logging.LogRecord;

public class BriefLogFormatter extends Formatter { private static final DateFormat format = new SimpleDateFormat («yyyy-MM-dd HH: mm: ss»); private static final String lineSep = System.getProperty («line.separator»); /** * A Custom format implementation that is designed for brevity. */ public String format (LogRecord record) { String loggerName = record.getLoggerName (); if (loggerName == null) { loggerName = «root»; } StringBuilder output = new StringBuilder () .append (loggerName) .append (»[») .append (record.getLevel ()).append ('|') .append (Thread.currentThread ().getName ()).append ('|') .append (format.format (new Date (record.getMillis ()))) .append (»]:») .append (record.getMessage ()).append (' ') .append (lineSep); return output.toString (); } } Логирование скриптов (bash) Для bash в heka добавляется входной фильтр TcpInput (который слушает на определенном порту) и PayloadRegexDecoder для декодирования сообщений. Конфигурация в hekad.toml: [TcpInput] address = »127.0.0.1:5566» parser_type = «regexp» decoder = «TcpPayloadDecoder»

[TcpPayloadDecoder] type = «PayloadRegexDecoder» #Parses space_checker[INFO|2014–01–01 3:08:06]: Need more space on disk /dev/sda6 match_regex = '^(? P[\w\.\-]+)\[(? P[^\|]+)\|(? P[A-Z]+)\|(? P[\d\-\s:]+)\]: (? P.*)' timestamp_layout = »2006–01–02 15:04:05» timestamp_location = «Europe/Moscow»

[TcpPayloadDecoder.severity_map] ERROR = 3 WARNING = 4 INFO = 6 ALERT = 1

[TcpPayloadDecoder.message_fields] Type = «scripts» Message = »%Message%» Logger = »%LoggerName%» Hostname = »%Hostname%» Payload = »[%Hostname%|%LoggerName%] %Message%» Для логирования написана функция, которая отправляет сообщения на TCP порт в заданном формате: log () { if [ »$1» ]; then echo -e «app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1» | nc 127.0.0.1 5566 || true echo $1 fi } Отправка сообщения с уровнем INFO с типом app1: log «test test test» Отправка записей в elasticsearch Добавляется следующая конфигурация в hekad.conf: [ESJsonEncoder] index = «heka-%{Type}-%{2006.01.02}» es_index_from_timestamp = true type_name = »%{Type}»

[ElasticSearchOutput] message_matcher = «Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'» server = «http://:9200» flush_interval = 5000 flush_count = 10 encoder = «ESJsonEncoder» Здесь мы указываем, где находится elasticsearch, как должны формироваться индексы и какие типы сообщений туда отправлять.Просмотр логов Для просмотра логов используется Kibana 4. Она все еще в бете, но уже вполне рабочая. Для установки надо скачать архив со страницы http://www.elasticsearch.org/overview/kibana/installation/, распаковать в какую либо папку на сервере и указать расположение elasticsearch сервера в файле config/kibana.yml (параметр elasticsearch_url).При первом запуске понадобится добавить индексы во вкладке Settings. Чтобы можно было добавить шаблон индекса и правильно определились поля, необходимо отправить тестовое сообщение из каждого приложения. Потом можно добавить шаблон индекса вида heka-*(которое покажет все сообщения) или heka-scripts-*, тем самым отделив приложения друг от друга. Дальше можно перейти вкладку Discover и посмотреть сами записи.

Заключение Я показал только часть того, что можно логировать с помощью Heka.Если кто-либо заинтересовался, то могу показать часть Ansible конфигурации, которая автоматически устанавливает heka на все серверы, а на выбранные elasticsearch с kibana.

© Habrahabr.ru