Тестирование и отладка MapReduce

В «Ростелекоме» мы используем Hadoop для хранения и обработки данных, загруженных из многочисленных источников с помощью java-приложений. Сейчас мы переехали на новую версию hadoop с Kerberos Authentication. При переезде столкнулись с рядом проблем, в том числе и с использованием YARN API. Работа Hadoop с Kerberos Authentication заслуживает отдельной статьи, а в этой мы поговорим об отладке Hadoop MapReduce.

517ccff7f9688538cbffdd7ab838470e.png
При выполнении заданий в кластере запуск отладчика усложняется тем, что мы не знаем, какой узел будет обрабатывать ту или иную часть входных данных, и не можем заранее настроить свой отладчик.

Можно использовать проверенный временем `System.out.println("message")`. Но как проанализировать вывод `System.out.println("message")` разбросанных по этим узлам?

Мы можем выводить сообщения в стандартный поток ошибок. Все, что пишется в stdout или stderr,
направляется в соответствующий файл журнала, который можно найти на веб-странице расширенной информации о задаче или в журнальных файлах.

Мы также можем включить в код средство отладки, обновлять сообщения о состоянии задачи и использовать пользовательские счетчики, которые помогут нам понять масштаб бедствия.

Приложение Hadoop MapReduce можно отлаживать во всех трех режимах, в которых может работать Hadoop:
 

  • standalone
  • pseudo-distributed mode
  • fully distributed


Более подробно мы остановимся на первых двух.
 

Pseudo-distributed mode (псевдораспределенный режим)


Псевдораспределенный режим используется для имитации реального кластера. И может использоваться для тестирования в среде, максимально приближенной к продуктивной. В данном режиме все демоны Hadoop будут работать на одном узле!

Если у вас есть dev-сервер или другая песочница (например, Virtual Machine с настроенной средой разработки, такой как Hortonworks Sanbox с HDP), то можно отладить управляющую программу, используя средства удаленной отладки (remote debugging).

Для запуска отладки нужно задать значение переменной окружения: **YARN_OPTS**. Ниже приведен пример. Для удобства можно создать файл startWordCount.sh и добавить в него необходимые параметры для запуска приложения.

#!/bin/bash

source /etc/hadoop/conf/yarn-env.sh
export YARN_OPTS='-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6000 ${YARN_OPTS}'

yarn jar wordcount-0.0.1.jar ru.rtc.example.WordCount /input /output


Теперь, запустив скрипт `./startWordCount.sh`, мы увидим сообщение

Listening for transport dt_socket at address: 6000


Осталось настроить IDE для удаленной отладки (remote debugging). Я использую intellij IDEA. Перейдем в меню Run → Edit Configurations… Добавим новую конфигурацию **Remote**

344574fe735f992742437f5ba3660c31.png

Поставим breakpoint в main и запустим.

e8acc98f5739b25b54630170fcd3c057.png

Все, теперь мы можем отлаживать программу как обычно.

ВНИМАНИЕ. Вы должны убедиться, что работаете с последней версией исходного кода. Если нет, то у вас могут быть различия в строках, в которых отладчик выполняет остановку.

В ранних версиях Hadoop поставлялся специальный класс, который позволял повторно запустить сбойное задание — isolationRunner. Данные, вызвавшие сбой, сохранялись на диск по адресу, указанному в переменной окружения Hadoop mapred.local.dir. К сожалению, в последних версиях Hadoop такой класс больше не поставляется.

Standalone (локальный запуск)


Standalone — это стандартный режим, в котором работает Hadoop. Он подходит для отладки там, где не используется HDFS. При такой отладке можно использовать ввод и вывод через локальную файловую систему. Standalone-режим обычно является самым быстрым режимом Hadoop, так как он использует локальную файловую систему для всех входных и выходных данных.

Как упоминалось ранее, можно внедрить в код средство отладки, например, счетчики. Счетчики определяются перечислением (**enum**) Java. Имя перечисления определяет имя группы, а поля перечисления определяют имена счетчиков. Счетчик может пригодиться для оценки проблемы,
и может использоваться как дополнение к отладочному выводу.

Объявление и использование счетчика:

package ru.rt.example;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class Map extends Mapper {
  private Text word = new Text();
  
  enum Word {
    TOTAL_WORD_COUNT,
  }
  
  @Override
  public void map(LongWritable key, Text value, Context context) {

    String[] stringArr = value.toString().split("\\s+");

    for (String str : stringArr) {
      word.set(str);
      context.getCounter(Word.TOTAL_WORD_COUNT).increment(1);
    }
  }
}
}


Для инкремента счетчика нужно использовать метод **increment (1)**

...
context.getCounter(Word.TOTAL_WORD_COUNT).increment(1);
...


После успешного завершения MapReduce задача в конце выводит значения счетчиков.

    Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
    ru.rt.example.Map$Word
            TOTAL_WORD_COUNT=655


Ошибочные данные можно выводить в stderr или в stdout, или писать выходные данные в hdfs, используя класс **MultipleOutputs** для дальнейшего анализа. Полученные данные можно передавать на вход приложению в standalone режиме или при написании unit-тестов.

В Hadoop есть библиотека MRUnit, которая используется совместно с фреймворками тестирования (например, JUnit). При написании модульных тестов мы проверяем, что на выходе функция выдает ожидаемый результат. Мы используем класс MapDriver из пакета MRUnit, в свойствах которого устанавливаем тестируемый класс. Для этого используется метод `withMapper()`, входные значения `withInputValue()` и ожидаемый результат `withOutput()` или `withMultiOutput()`, если используется множественный вывод.

Вот наш тест.
 


package ru.rt.example;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class TestWordCount {
   
   private MapDriver mapDriver;
   
   @Before
   public void setUp() {
      Map mapper = new Map();
      mapDriver.setMapper(mapper)
   }
   
   @Test
   public void mapperTest() throws IOException {
      mapDriver.withInput(new LongWritable(0), new Text("msg1"));
      mapDriver.withOutput(new Pair(new Text("msg1"), new IntWritable(1)));
      mapDriver.runTest();
   }
}


Fully distributed mode (полностью распределенный режим)


Как следует из названия, это режим, в котором используется вся мощность Hadoop. Запущенная программа MapReduce может работать на 1000 серверов. Всегда сложно отлаживать программу MapReduce, так как у вас есть Mappers, работающие на разных машинах с разными входными данными.

Заключение


Как оказалось, тестирование MapReduce не такая простая задача как кажется на первый взгляд.
Чтобы сэкономить время в поисках ошибки в MapReduce, я использовал все перечисленные методы и советую всем тоже их применять. Это особенно полезно в случае с большими инсталляциями, подобных таким, какие работают в «Ростелекоме».

© Habrahabr.ru