MongoDB как средство мониторинга LOG-файлов

В этой статье я расскажу об использовании нереляционной базы MongoDB для мониторинга журнальных файлов. Для мониторинга log-файлов существует множество инструментов, от мониторинга shell-скриптами, завязанными на cron, до кластера apache hadoop.

164e780210cd4cb99faf4fadaf32b4ed.jpg

Подход с мониторингом скриптами текстовых файлов удобен только в простейших случаях, когда, например, проблемы выявляются наличием в журнальном файле строк «ERROR», «FAILURE», «SEVERE» и т.п. Для мониторинга больших файлов удобно использовать систему Zabbix, где Zabbix Agent (active) будет считывать только новые данные и с определённой периодичностью отправлять их на сервер.
До определённого момента средств Zabbix было достаточно, но для мониторинга бизнес-процессов, анализа метрик в рамках SLA, зачастую появляются более сложные задачи, например:
— определение количества операций, прошедших через систему за промежуток времени;
— определение времени обработки различных операций;
— выявление процентного количество ошибок, исключений в разбивке по операциям и бизнес-данным;
— сбор статистики по немонетарным транзакциям, анализ уровня сервиса.

Обычно такие задачи — головная боль администраторов систем, которым приходится искать изощрённые способы сбора, агрегации и представления таких данных, если система изначально не была спроектирована с расчётом их сбора.

Для решения этих задач нам пришлось искать более гибкое и универсальное решение.
Первоначально пришлось научить приложение с помощью java-драйвера писать логи структурированными документами в MongoDB. Модель данных была специально спроектирована для хранения унифицированных документов в JSON-формате.

Преимущества такого метода — отсутствие задержки в получении новых данных (как в случае apache hadoop), наличие репликации и при необходимости сегментирования, удобные средства выборки и анализа данных с помощью API MongoDB, mapReduce и функций на языке JavaScript, но обо всём по порядку.

Пример


Допустим приложение пишет в БД документы вида:

{
        _id: ObjectId
        id: string
        operation: string
        time: ISODate
        data: {
                        info: string
                        result: string
                        message: string
        }
        array: [a1, a2, ... ,aN]
}


Имея коллекцию таких документов, с помощью простейшего запроса можем понять, сколько операций test приложение выполнило за последние 5 минут:

db.test.count({ "operation": "test", "time": {$gte: new Date(new Date() - 1000*60*5)} })


Такой запрос легко «завернуть» в shell-скрипт, который будет подключиться к mongo, а его результат выводить в терминал. Этот скрипт впоследствии можно подключить к системе мониторинга, что мы и сделали, получив наглядный график и «навесив» триггеров на неадекватные значения.

Чтобы запрос выполнялся быстро, достаточно построить единственный индекс db.test.ensureIndex( {time: 1}, {background: true} ). Тогда MongoDB будет просматривать лишь данные за последние 5 минут, а не все документы коллекции. Можно сразу добавить ещё несколько индексов, но если их будет слишком много, при каждой операции insert бинарные деревья, соответствующие им, будут дописываться, что создаёт дополнительную нагрузку. И в какой-то момент индексы могут не поместиться в оперативную память, тогда чтение будет происходить с диска, что значительно замедлит доступ к данным.

Работа с датами


Поле с временем (в данном случае time) я использовал чаще всего. В начале работы с оболочкой MongoDB найти информацию по составлению простейших запросов не составит труда, а вот с запросами по дате были проблемы. Опишу несколько способов, как выделить документы в указанном временном промежутке.

Если приложение при операции insert в поле time вставляет new Date(), дата будет записана в ISODate-формате. Введя в консоль оболочки mongo команду new Date() в выводе будет текущая дата в формате ISODate("YYYY-MM-DDThh:mm:ss[.sss]Z") — эту строку можно взять и подставить в запрос вида:

db.test.find({ time: {$gte: ISODate("YYYY-MM-DDThh:mm:ss[.sss]"), $lte: ISODate("YYYY-MM-DDThh:mm:ss[.sss]")} }), где $gte — больше или равно, $lte — меньше или равно, [.sss] — количество милисекунд.

Также можно напрямую задать дату через new Date():

db.test.find({ time: {$gte: new Date(YYYY, MM, DD, hh, mm, ss, sss) } }), где отсчёт месяцев (MM) начинается с 0.

или

db.test.find({ time: {$gte: new Date( new Date() -1000*300 ) } }) — выводим документы за последние 5 минут. Вместо выражения "-1000*300" можно подставить любое время в милисекундах. Также перед запросом можно заранее определить переменные с датами:

var today = new Date();
var yesterday = new Date();
yesterday.setDate(today.getDate() - 1);

db.test.find( {"time": {$gte : yesterday, $lt : today} } );


В некоторых случаях удобно использовать POSIX, например:

for (var i = today.getTime(); i < yesterday.getTime(); i=i+300*1000) {var b=0; b = db.test.find({ "time": { $gte: new Date(i), $lte: new Date(i+300*1000) }     // с помощью getTime мы получаем POSIX время в миллисекундах
}).count();     // .count() позволяет подсчитать количество документов
time=new Date(i);       // задаем дату текущей итерации i
print(b+"; "+time.toTimeString().split(' ')[0]) } // выводим количество операций (b) + время time в формате timeString, с помощью split(' ')[0] из этой строки выводим только время.


После выполнения данного запроса мы получим в консоль общее количество документов, прошедших через систему, в разбивке по 5-минутным интервалам за сутки.

Метод forEach


Поскольку JSON-подобный вывод агрегации часто неудобен, выручает метод .forEach(), который применяется к курсору и способен модифицировать документ произвольным способом с помощью javascript.

Например, необходимо вывести список id за указанный промежуток времени (возьмём последние 5 минут):

db.test.aggregate({$match:{time:{$gte:new Date(new Date()-1000*300)}}}).forEach(

function(doc) {
    print( doc.id )
    }

)


Вместо aggregate в данном случае можно использовать find или distinct — главное, чтобы на входе forEach() был массив. Поскольку формат вывода агрегации несколько изменялся от версии к версии, в версии до 2.6, например, надо использовать aggregate({...},...,{...}).result.forEach, так как вывод имеет формат "result":[{...},...,{...}].

Ещё один пример: необходимо выяснить, какой финальный статус у каждого id и выгрузить в таблицу.

db.cbh.aggregate(
{$match: {time: {$gte: new Date(new Date()-1000*300)}}}, // ограничиваем изначальный набор данных
{$group: { _id: "$id", // группируем по id
          "count": {$sum:1}, // подсчитываем количество документов с этим id
          "result": {$push:"$data.result"} // вставляем результат в массив
           } },
{$match: { "count": 4 }}, // отбираем id в которых 4 записи 
{$match: { "result": { $ne:[] }}} // исключаем пустой result
).result.forEach(function(doc) {print( doc._id+", "+doc.result )} )


Такой запрос выведет в консоль результат вида »id, data.result», который можно импортировать в excel или любую реляционную СУБД.

Функции


С помощью функций удобно подсчитывать метрики, сложные запросы мониторинга, формировать отчёты. Приведу пример простой функции подсчёта среднего времени выполнения операции.

function avgDur(operation, period) {

var i=0;
var sum=0;
var avg=0;

db.test.aggregate(
        { $match : { "time" : { $gte : new Date(new Date - period*1000) } } },                                       
        { $group : {_id: "$id", 
                                "operation":{$addToSet : "$operation"},
                                "time":{$push : "$time"},
                                "count":{$sum: 1}
                                } },
        { $match : {"operation": operation,
                                "count":4
                                },
        { $project : {_id:0, "time":1}} 

        ).result.forEach(function(op) {
                dur=op.time[3]-op.time[0]; 
                sum=sum+dur;
                i=i+1;                  
        });     
avg=sum/i;
print(avg/1000); // выводим среднее время в секундах 

}


Прежде чем сохранять функцию, лучше запустить её в консоли, avgDur(test,300) и проверить её работу. Далее сохраняем её:

db.system.js.save(
   {
     _id: "avgDur",
     value : function(operation, period) { 
        ...
      }
   }
)


После чего запускаем db.loadServerScripts(); и вызываем фукнцию avgDur(test,300).

Если сохранить невалидную функцию, при db.loadServerScripts() можем получить ошибку и не сможем обратиться к другим функциям, поэтому тщательно проверяем перед сохранением.

Подводные камни


Первое, с чем я столкнулся на сервере без репликации — трудности с освобождением места на диске. MongoDB пишет документы на диск подряд, и, если удалить часть коллекции вручную командой db.test.remove({...}), то место на диске не освободится, т.к. это бы вызвало сильную фрагментацию. Во избежании этого MongoDB оставляет все документы на своих местах, а в пропусках добавляет ссылки. Тогда чтобы «сжать коллекцию», потребуется выполнить db.repairDatabase(), но команда потребует столько же места на диске, сколько занимает БД, т.к. вначале база копируется на новое место, и только потом удаляются файлы старой базы.

Чтобы избежать подобных проблем, я нашёл несколько решений, которые можно комбинировать:

1. Репликация. При наличии дублирующего сервера, всегда можно остановить slave-реплику, выполнить repairDatabase() и запустить сервер обратно. Ещё лучше настроить TTL (time-to-live) индекс, где документы будут сами удаляться после определённого времени жизни, например, через 30 дней. Но в этом случае всё равно придётся периодически делать repairDatabase().

2. Создавать коллекцию сразу ограниченного объёма Capped collection. Когда коллекция достигнет ограничения, самые старые документы начнут перезаписываться самыми новыми.

3. Посмотреть, сколько места на диске будет занимать коллекция после 30 дней записи логов, после чего выполнить convertToCapped этой коллекции и задать ограничение с запасом.

4. Записывать временные коллекции в отдельную БД, т.к. при выполнении db.dropDatabase() место на диске будет гарантированно освобождено. А вот db.collectioName.drop (), к сожалению, место на диске не освобождает. Данные просто помечаются как недоступные.

Начиная с версии 2.6 стратегия преаллоцирования немного изменилась, по умолчанию для коллекции применяется опция usePowerOf2Sizes. Освобожденное в результате удаления документов дисковое пространство стало использоваться рациональнее, однако, надёжнее изначально определить размер коллекции.  

Агрегация больших объёмов данных


Агрегация — не самый удачный инструмент для обработки огромных объемов данных: 100 миллионов документов, таких как в примере выше, агрегировать будет проблематично.

Первое ограничение, с которым придётся столкнуться — размер результирующего документа не должен превышать 16 Мб. Однако начиная с версии 2.6 результат агрегации передаётся в качестве курсора, и с помощью метода cursor.next(), применяемого к курсору, можно выделить нужные данные последовательно.   

Также есть ограничение в 64 Мб на использование при обработке буфера, который может переполниться при таком количестве документов. Начиная с версии 2.6 параметр агрегации {allowDisckUse:true} помогает этого избежать.

Большие объёмы данных обрабатывать лучше всего mapReduсe, где используется многопоточная обработка, а также распределение нагрузки между серверами в сегментированном кластере.

Но это всё мелочи. Настоящие муки начнутся, если схема данных не подходит для mongodb, когда появятся задачи, требующие связывания данных из нескольких коллекций или рекурсивного подхода в поиске документов и агрегации.

Приведу пример:

Допустим, нужно вычислить среднее время выполнения всех успешных операций. Но если подход логирования транзакций подразумевает запись документов типа «запрос» и «ответ», и таких документов в рамках одной транзакции будет несколько, то в нашей схеме поле «data.result» с результатом выполнения транзакции появится только в последнем документе серии.

Так как же в этом случае вычислить время? Если в агрегации в блоке $match искать все документы с "data.result":"SUCCESS", то в выборку попадут только последние документы внутри каждого id. Парадигма mapReduce здесь тоже не поможет, т.к. на стадии map MongoDB проходит по коллекции только 1 раз.

Можно пройти по коллекции и собрать массив со всеми нужными id и выполнить агрегацию, подставив в $match этот массив:

ids=[];
var monthBegin = new Date(new Date().getFullYear(),new Date().getMonth()-1,1,0,0,0,0) 
var monthEnd = new Date(new Date().getFullYear(),new Date().getMonth(),1,0,0,0,0)

// Здесь мы задаём дату напрямую через new Date(), где вместо YYYY подставляем текущий год, используя выражение new Date().getFullYear(), а вместо MM выражение new Date().getMonth() и new Date().getMonth()-1 для текущего и предыдущего месяца. 

db.test.find({time:{$gte:monthBegin, $lt:monthEnd}, "operation":"test", "data.result":"SUCCESS"}, {"_id":0, "id":1}
).forEach(function(op)
    {
        ids.push(op.id) // заполняем массив id для каждого найденного документа
    }); 
    
db.test.aggregate(
    {$match:{"id":{$in:ids}}}, // подставляем массив с id в аггрегацию
    {...} 
)


Однако стоит понимать, что этот массив будет всё это время находиться в памяти, и, если в нём будет несколько миллионов элементов, есть высокая вероятность получить out of memory. Чтобы избежать проблем c выделением памяти, можно записать «подготовленные данные» в новую коллекцию:

db.getSiblingDB("testDb").test.find({
    "time" : { $gte : new Date(monthBegin.getTime()), $lte : new Date(monthEnd.getTime()) },
    "operation" : "test",
    "data.result" : "SUCCESS"
}).addOption(DBQuery.Option.noTimeout).forEach( // добавляем опцию noTimeout для курсора
    function(doc) {
        db.test.find({"id" : doc.id}).forEach( // выполняем поиск по id, который получили в запросе выше
            function(row) {
                db.getSiblingDB("anotherDb").newTest.insert({ // для каждого документа с данным id пишем документ в новую коллекцию. 
                    "id"                : doc.id,
                    "time"         : row.time,
                });
            }
        );
    }
);


Таким образом, мы записали новую коллекцию с документами типа {id:sting, time:ISODate(...)}. Далее выполняем нехитрую агрегацию новой коллекции и получаем искомый результат:

i=0; 
sum=0; 
avg=0; 

db.getSiblingDB("anotherDb").newTest.aggregate(
    {$group:{_id:"$id",
        "time":{$push:"$time"},
        }}
    ).result.forEach(function(op)
        {
                dur=op.time[op.time.length-1]-op.time[0]  
                sum=sum+dur;
                i=i+1;

        });

print(sum/i) //выводим искомое время.


Таким образом, описанный выше подход позволил упростить поиск нужных записей, настроить весьма точный мониторинг на всех уровнях, а также сформировать отчёты и статистику только на основе журнальных данных из MongoDB. Управления схемой данных из приложения и удобство в развёртывании позволят сэкономить время на разработку.

Однако схема данных MongoDB подойдёт не для каждого проекта. Если заведомо известно, что рекурсивный подход в поиске данных или джойны не потребуется, скорее всего, серьёзных проблем не будет!

© Habrahabr.ru