Когда старый MapReduce лучше нового Tez

9d142a2dac0b4d16b43f7808d23f80ec.png

Как всем известно, количество данных в мире растёт, собирать и обрабатывать поток информации становится всё сложнее. Для этого служит популярное решение Hadoop c идеей упрощения методов разработки и отладки многопоточных приложений, использующее парадигму MapReduce. Эта парадигма не всегда удачно справляется со своими задачами, и через некоторое время появляется «надстройка» над Hadoop: Apache Tez с парадигмой DAG. Под появление Tez подстраивается и HDFS-SQL-обработчик Hive. Но не всегда новое лучше старого. В большинстве случаев HiveOnTez значительно быстрее HiveOnMapReduce, но некоторые подводные камни могут сильно повлиять на производительность вашего решения. Здесь я хочу рассказать, с какими нюансами столкнулся. Надеюсь, это поможет вам ускорить ETL или другой Hadoop UseCase.

MapReduce, Tez и Hive
Как я сказал ранее, данных в мире всё больше и больше. И для их хранения и обработки придумывают всё более хитрые решения, среди них и Hadoop. Чтобы процесс обработки хранящихся на HDFS данных был прост даже для рядового аналитика, есть несколько SQL-надстроек над Hadoop. Самая старая и «простая» из них — Hive. Суть Hive такова: мы имеем данные в каком-либо внятном column-store формате, заносим информацию о них в метаданные, пишем стандартный SQL с рядом ограничений, и он генерирует цепочку MapReduce-job«ов, которые решают нашу задачу. Здорово, удобно, но медленно. Например, вот простой запрос:
select 
    t1.column1, 
    t2.column2 
from 
    table1 t1
    inner join table2  t2 on t1.column1 = t2.column1
union 
select 
    t3.column1, 
    t4.column2 
from 
    table3 t3
    inner join table4 t4 on t3.column1 = t4.column1
order by 
    column1;

Этот запрос порождает четыре джоба:
  • table1 inner join table2;
  • table3 inner join table4;
  • union;
  • sort.

1aa41cfd5e1e4a33844aaeed7fcb7964.png

Шаги выполняются последовательно, и каждый из них завершается записью данных на HDFS. Это выглядит весьма неоптимальным. Например, шаги 1 и 2 могли бы выполняться параллельно. А бывают и такие ситуации, когда у нескольких шагов разумно применить один и тот же Mapper, а потом уже на результаты этих Mapper«ов наложить несколько видов Reducer«ов. Но концепция MapReduce в рамках одного job«а не позволяет так делать. Для решения этой проблемы достаточно быстро появляется Apache Tez с концепцией DAG. Суть DAG сводится к тому, что вместо пары Mapper-Reducer (+epsilon) мы строим нецикличный направленный граф, каждая вершина которого является Mapper.Class«ом или Reduser.Class«ом, а ребра означают потоки данных / порядок выполнения. Кроме DAG, Tez предоставил еще несколько бонусов: ускоренный запуск job«ов (можно посылать DAG-job«ы через уже запущенный Tez-Engine), возможность удерживать ресурсы в памяти ноды между шагами, самостоятельно запускать распараллеливание и т. д. Естественно, вместе с Tez вышла и соответствующая надстройка над Hive. С этой надстройкой наш запрос превратится DAG-job примерно следующей структуры:

  1. Mapper считывает table1.
  2. Mapper считывает table2 и джойнит её с результатом шага 1.
  3. Mapper считывает table3 и фильтрует column1 IS NOT NULL.
  4. Mapper считывает table4 и фильтрует column1 IS NOT NULL.
  5. Reducer джойнит результаты шагов 3 и 4.
  6. Reducer, делающий union.
  7. Reducer Group By и Sort.
  8. Собирает результат.

2822aa81d1b54a1f914e2bd24bc3e791.png

Фактически шаги 1 и 2 — это первый join, а 2, 3 и 4 — это второй join (я специально подобрал таблицы разных размеров, чтобы join«ы обрабатывались по-разному). При этом два блока друг от друга не зависят и могут выполняться параллельно. Это уже очень здорово. Tez действительно даёт значительный прирост в скорости обработки сложных запросов. Но иногда Tez может быть хуже MapReduce, и поэтому перед отправкой в production стоит попробовать запрос как с set hive.execution.engine=tez, так и с set hive.execution.engine=mr.

Так что же такое Tez?
Всё, что надо знать о Tez: он меняет логику MapReduce на логику DAG (directed acyclic graph — направленного ациклического графа), предоставляя возможность в рамках одного DataFlow параллельно выполнять несколько разных процессов, будь то Mapper или Reducer. Главное, чтобы его входные данные были готовы. Хранить данные можно локально на нодах между шагами, а иногда и просто в оперативной памяти ноды, не прибегая к дисковым операциям. Можно оптимизировать количество и местоположение Mapper«ов и Reducer«ов, чтобы минимизировать передачу данных по Сети даже с учётом многошаговых расчётов, переиспользовать контейнеры, которые уже отработали в соседних процессах в рамках одного Tez-Job«а, и подстраивать параллельное выполнение под статистику, собранную на предыдущем шаге. Кроме того, движок позволяет конечному пользователю создавать DAG-задачи с той же простотой, что и MapReduce, при этом он сам будет заниматься ресурсами, перезапусками и управлением DAG на кластере. Tez очень мобилен, добавление поддержки Tez не ломает уже работающие процессы, а тестирование новой версии возможно локально «на клиентской стороне» тогда, когда во всех задачах кластера будет работать старая версия Tez. Last but not least: отметим, что Tez может запускаться на кластере как служба и работать в «фоновом режиме», что позволяет ему отправлять задачи на выполнение значительно быстрее, чем это происходит при стандартном запуске MapReduce. Если вы ещё не пробовали Tez и у вас остались сомнения, то посмотрите на сравнение скорости, опубликованное в презентации HortonWorks:

390d5a237c134f50825f25d4fdfc6706.png

И в паре с Hive:

f29aff58743d4688a8467caa43c1be79.png

Но при всей этой красоте графиков и описаний в HiveOnTez есть и проблемы.

Tez менее устойчив к неравномерному распределению данных, чем MapReduce
Первая и самая большая проблема лежит в разнице создания DAG-job и MapReduce-job. У них один принцип: количество Mapper«oв и Reducer«ов рассчитывается в момент запуска job«а. Только когда запрос выполняется цепочкой MapReduce-job«ов, Hadoop рассчитывает необходимое количество задач на основе результата предыдущих шагов и собранной аналитики по источникам, а в случае DAG-job это происходит до вычисления всех шагов, только на основе аналитики.

Поясню на примере. Где-то в середине запроса по мере выполнения вложенных запросов у нас появляются две таблицы. По оценкам статистики, в каждой по n строк и по k уникальных значений join-ключа. На выходе ожидаем примерно n*k строк. И допустим, это количество хорошо укладывается в один контейнер, и Tez выделит на следующий шаг (допустим, сортировка) один Reducer. И это число Reducer«ов уже в процессе выполнения не поменяется независимо ни от чего. Теперь допустим, что на самом деле у этих таблиц очень плохой skew: на одно значение приходится n — k + 1 строка, а все остальные — по одной строке. Таким образом, на выходе мы получим n^2 + k^2 — 2kn — k + 2n строк. То есть (n + 2 — 2k)/k + (k — 1)/n больше n/k в два раза. И уже такое количество один Reducer будет выполнять вечность. А в случае с MapReduce, получив на выходе этого шага n^2 + k^2 — 2kn — k + 2n, Hadoop объективно оценит свои силы и выдаст нужное количество Mapper«ов и Reducer«ов. В результате c MapReduce всё отработает гораздо быстрее.

Сухие вычисления могут показаться надуманными, но на самом деле такая ситуация реальна. И если её не произошло, то считайте, что вам повезло. С аналогичным эффектом Tez-DAG«а я сталкивался ещё при использовании lateral view в сложных запросах или кастомных Mapper«ах.

Особенности тюнинга Tez
По иронии, последняя известная мне важная особенность Tez, которая может навредить, связана с его силой — DAG. Чаще всего кластер — это не просто хранилище информации. Это еще и система, в которой ведётся постобработка данных, и важно, чтобы на эту часть кластера не влияла остальная деятельность. Так как ноды — это ресурс, то обычно количество ваших контейнеров не безгранично. А значит, когда вы запускаете job, то лучше не забивать все контейнеры, чтобы сильно не тормозить регулярные процессы. И тут DAG может подложить вам свинью. DAG требуется (в среднем по палате) меньше контейнеров за счёт их переиспользования, более плавной нагрузки и т. д. Но когда быстрых шагов много, контейнеры начинают размножаться в геометрической прогрессии. Первые Mapper«ы ещё не доработали, но данные уже распространяются по другим Mapper«ам, под всё это выделяются контейнеры, и — бум! Ваш кластер забит в потолок, никто больше не может запустить ни одного job«а. Ресурсов не хватает, и вы смотрите, как медленно меняются цифры на прогресс-баре. MapReduce из-за своей последовательности от такого эффекта избавлен, но платите вы за это, как всегда, скоростью.

Мы давно знаем, как бороться с тем, что стандартный MapReduce занимает слишком много контейнеров. Регулируем параметры:

  • mapreduce.input.fileinputformat.split.maxsize: уменьшая — увеличиваем количество Mapper«ов;
  • mapreduce.input.fileinputformat.split.minsize: увеличивая — уменьшаем количество Mapper«ов;
  • mapreduce.input.fileinputformat.split.minsize.per.node, mapreduce.input.fileinputformat.split.minsize.per.rack: более тонкая настройка для контроля локальных (в смысле node или rack) партиций;
  • hive.exec.reducers.bytes.per.reducer: увеличивая — уменьшаем количество Reducer«ов;
  • mapred.tasktracker.reduce.tasks.maximum: выставляем максимальное количество Reducer«ов;
  • mapred.reduce.tasks: задаём конкретное число Reducer«ов.

Осторожно! В DAG все reduce-шаги будут иметь столько процессов, сколько вы укажете тут! Но параметры Tez более хитрые, и не всегда параметры, которые мы задали для MapReduce, на него действуют. Во-первых, Tez очень чувствителен к hive.tez.container.size, и интернет советует брать значение между yarn.scheduler.minimum-allocation-mb и yarn.scheduler.maximum-allocation-mb. Во-вторых, взгляните на параметры удержания неиспользуемого контейнера:
  • tez.am.container.ide.release-timeout-max.millis;
  • tez.am.container.ide.release-timeout-min.millis.

Опция tez.am.container.reuse.enabled активирует или дезактивирует переиспользование контейнеров. Если она отключена, то предыдущие два параметра не работают. И в-третьих, посмотрите на параметры группировки:
  • tez.grouping.split-waves;
  • tez.grouping.max-size;
  • tez.grouping.min-size.

Дело в том, что ради распараллеливания чтения внешних данных Tez изменил процесс формирования задач: сначала Tez оценивает, сколько волн (w) можно запустить на кластере, потом это количество умножается на параметр tez.grouping.split-waves, и произведение (N) делится на количество стандартных сплитов на задачу. Если результат действий находится между tez.grouping.min-size и tez.grouping.max-size, то всё хорошо и задача запускается в N задач. Если нет, то число адаптируется к рамкам. Документация по Tez советует «только в качестве эксперимента» выставлять параметр tez.grouping.split-count, который отменяет всю вышеизложенную логику и группирует сплиты в указанное в параметре количество групп. Но я этим свойством стараюсь не пользоваться, оно не дает гибкости Tez«у и Hadoop«у в целом для оптимизации под конкретные входные данные.Нюансы Tez
Кроме крупных проблем, Tez не избавлен от маленьких недостатков. Например, если вы пользуетесь http Hadoop ResourceManager«ом, то вы не увидите в нём, сколько какой Tez-job занимает контейнеров, а тем более — в каком состоянии его Mapper«ы и Reducer«ы. Для мониторинга состояния кластера я использую этот маленький python-скрипт:
import os
import threading

result = []
e = threading.Lock()

def getContainers(appel):
    attemptfile = os.popen("yarn applicationattempt -list " + appel[0])
    attemptlines = attemptfile.readlines()
    attemptfile.close()

    del attemptlines[0]
    del attemptlines[0]

    for attempt in attemptlines:
        splt = attempt.split('\t');
        if ( splt[1].strip()  == "RUNNING" ):
            containerfile = os.popen("yarn container -list " + splt[0] )
            containerlines = containerfile.readlines()
            containerfile.close()
            appel[2] += int( containerlines[0].split("Total number of containers :")[1].strip()  )
    e.acquire()
    result.append(appel)
    e.release()

appfile = os.popen("yarn application -list -appStates RUNNING")
applines = appfile.read()
appfile.close()

apps = applines.split('application_')
del apps[0]

appsparams = []

for app in apps:
    splt = app.split('\t')
    appsparams.append(['application_' + splt[0],splt[3], 0])

cnt = 0
threads = []

for app in appsparams:
    threads.append(threading.Thread(target=getContainers, args=(app,)))
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

result.sort( key=lambda x:x[2] )

total = 0
for app in result:
    print(app[0].strip() + '\t' + app[1].strip() + '\t' + str(app[2]) )
    total += app[2]

print("Total:",total)

Несмотря на заверения HortonWorks, наша практика показывает, что когда в Hive вы делаете простой SELECT smth FROM table WHERE smth, то чаще всего MapReduce отработает быстрее, правда, ненамного. К тому же в начале статьи я вас немного обманул: распараллеливание в HiveOnMapReduce возможно, но не такое интеллектуальное. Достаточно только сделать set hive.exec.parallel=true и настроить set hive.exec.parallel.thread.number=… — и независимые шаги (пары Mapper + Reducer) будут выполняться параллельно. Да, в нём нет возможности, что на выходе одного Mapper«а будет запускаться несколько Reducer«ов или следующих Mapper«ов. Да, распараллеливание куда более примитивно, но тоже ускоряет работу.

Ещё одна интересная особенность Tez: он запускает свой движок на кластере и держит его включённым некоторое время. С одной стороны, это действительно ускоряет работу, так как задача запускается на нодах значительно быстрее. Но с другой стороны — неожиданный минус: важные процессы в таком режиме запускать нельзя, потому что TEZ-engine со временем порождает слишком много классов и падает с GC-overflow. И бывает так: вы запустили на ночь nohup hive -f ....hql > hive.log &, пришли утром, а оно где-то посередине упало, хайв завершился, temporary tables удалились, и всё надо считать заново. Неприятно.

Добавляет в копилку мелких проблем то, что старый добрый MapReduce уже вошёл в стабильный релиз, а TEZ, несмотря на популярность и прогрессивность, до сих пор находится в версии 0.8.4, и баги в нём могут встретиться на любом шагу. Самый страшный баг для меня — это удаление информации, но такого я не встречал. А вот с некорректным расчётом на Tez мы сталкивались, причём MapReduce считает корректно. Например, мой коллега использовал две таблицы — table1 и table2, имеющие уникальное поле EntityId. Сделал через Tez запрос:

select 
    table1.EntityId, count(1)
from 
    table1
    left join table2 on table1.EntityId = table2.EntityId
group by 
    EntityId 
having 
    count(1) > 1

И получил на выходе какие-то строки! Хотя MapReduce ожидаемо вернул пустой результат. Про похожую проблему есть bugreport.Заключение
Tez — безусловное благо, которое в большинстве случаев делает жизнь проще, позволяет писать в Hive более сложные запросы и ожидать на них быстрого ответа. Но, как и любое благо, оно требует к себе осторожного подхода, осмотрительности и знания каких-то нюансов. И как следствие, иногда использование старого, проверенного, надёжного MapReduce лучше, чем использование Tez. Я очень удивился, что не смог найти ни одной статьи (ни в рунете, ни в инглишнете) о минусах HiveOnTez, и решил восполнить этот пробел. Надеюсь, что информация окажется кому-то полезной. Спасибо! Всем удачи и пока!

Комментарии (0)

© Habrahabr.ru