Мониторинг серверов через очередь заданий на JAVA
Недавно был озадачен проблемой мониторинга нескольких десятков серверов (ну наверно редко кто не сталкивался с такой задачей). Проблему можно описать несколькими правилами: Нужно периодически пинговать сервер Иногда выполнять какое-либо действие с сервером (например, исполнение команды через ssh), которое засабмитил пользователь Действия с серверами могут нескольких типов, у каждого действия свой приоритет Таски (из п.1–3) нельзя выполнять одновременно для каждого сервера Таски могут завершаться с неудачей, например по причине отсутствия связи с сервером, нужно ждать пока связь восстановится и пытатся выполнить запланированную задачу Первое решение, которое приходит большинству в голову — запустить для каждого сервера свой поток и там делать свои дела. Это неплохо, но что делать если в процессе мониторинга набор серверов будет меняться? Запускать и завершать потоки в процессе мониторинга как-то неэлегантно. А что делать если серверов тысяча? Иметь тысячу потоков наверно можно, но зачем это делать когда большинство времени поток простаивает и ждет своего времени для очередного пинга?
На данную проблему можно взглянуть с другой стороны и представить ее в виде классической задачи «producer-consumer». У нас есть продюсеры, которые производят таски (пинг, команда ssh) и у нас есть консьюмеры, которые эти таски исполняют. Разумеется продюсеров и консьюмеров у нас не по одну экземпляру. Решить нашу задачу «producer-consumer» в JAVA не просто, а очень просто используя классы PriorityQueue и ExecutorService.
Начнем, как обычно, с юнит-теста:
@Test public void testOffer () { PollServerQueue xq = new PollServerQueue (); xq.addTask (new MyTask (1, 11)); xq.addTask (new MyTask (2, 12)); xq.addTask (new MyTask (1, 13)); MyTask t1 = (MyTask)xq.poll (); assertEquals (1, t1.getServerId ()); assertEquals (11, t1.getTaskId ());
MyTask t2 = (MyTask)xq.poll (); assertEquals (2, t2.getServerId ()); assertEquals (12, t2.getTaskId ()); MyTask t3 = (MyTask)xq.poll (); assertEquals (null, t3);
xq.FinishTask (1); MyTask t5 = (MyTask)xq.poll (); assertEquals (1, t5.getServerId ()); assertEquals (13, t5.getTaskId ()); }
В этом юнит-тесте мы добавили в нашу очередь три задачи типа MyTask (первый аргумент конструктора означает serverId, второй — taskId). Метод poll извлекает задачу из очереди. Если извлечь задачу не удалось (например, задачи кончились или в очереди остались задачи для серверов, для которых уже выполняются задачи) — метод poll возвращает null. Из кода видно, что завершение задачи для serverId=1 ведет к тому, что из очереди можно извлечь следующую задачу для данного сервера.
Ура! Юнит-тест написан, можно писать код. Нам потребуется:
Структура данных (HashMap) для хранения текущих исполняемых задач для каждого сервера (currentTasks) Структура данных (HashMap) для хранения задачи, стоящих в очереди на исполнения. Для каждого сервера — своя очередь (waitingTasks) Структура данных (PriorityQueue) для последовательного опроса серверов. Необходимо, чтобы в следующий вызов poll () к нам приходила задача для другого сервера. Короче, структура типа револьвера, только пули после каждого выстрела остаются в барабане (peekOrder) Cтруктура (HashSet) для хранения и быстрого поиска идентификаторов серверов в револьвере, чтобы каждый раз не просматривать револьвер с первого до последнего элемента (servers) Простой объект для синхронизации (syncObject) Теперь процедура извлечения таски из очереди будет простой и короткой. И хотя код получился компактным, публиковать его здесь я не вижу смысла, а отошлю вас на https://github.com/get-a-clue/PollServerQueueExample
Disclaimer: код на github’e не является законченным, в частности, в нем отсутствует возможность установки приоритетов для задач внутри очереди для каждого сервера и механизма обработки ошибок и возврата failed таски в очередь. Ну и сам код для пингования. Как говорится, меньше кода — лучше спишь. :)