Прокладка трубопровода со spark.ml

bfe8ffe85616429fa7d23ed976c761e4.pngСегодня я бы хотел рассказать о появившемся в версии 1.2 новом пакете, получившем название spark.ml. Он создан, чтобы обеспечить единый высокоуровневый API для алгоритмов машинного обучения, который поможет упростить создание и настройку, а также объединение нескольких алгоритмов в один конвейер или рабочий процесс. Сейчас на дворе у нас версия 1.4.1, и разработчики заявляют, что пакет вышел из альфы, хотя многие компоненты до сих пор помечены как Experimental или DeveloperApi.

Ну что же, давайте проверим, что может новый пакет и насколько он хорош. Первым делом нам нужно познакомиться с основными понятиями, введёнными в spark.ml.

1. ML Dataset — spark.ml использует для работы с данными DataFrame из пакета spark.sql. DataFrame представляет собой распределённую коллекцию, в которой данные хранятся как именованные колонки. Концептуально DataFrame эквивалентны таблице в реляционной базе данных или такому типу данных как frame в R или Python, но с более богатой оптимизацией под капотом. (Примеры и способы работы будут приведены ниже в статье).

2. Transformer (модификатор) — это просто любой алгоритм, который может преобразовать один DataFrame в другой. Для примера: любая обученная модель является модификатором, поскольку преобразует набор характеристик (features) в предсказание (prediction)

3. Estimator (алгоритм оценки) — это алгоритм, который может выполнить преобразование из DataFrame в Transformer. К примеру, любой алгоритм обучения является также и алгоритмом оценки, т. к. он принимает набор данных для обучения и создаёт на выходе обученную модель.

4. Pipeline — конвейер, объединяющий любое количество модификаторов и алгоритмов оценки для создания рабочего процесса машинного обучения.

5. Param — общий тип, который используют модификаторы и алгоритмы оценки для задания параметров.

Согласно описанному интерфейсу, каждый Estimator должен иметь метод fit, принимающий DataFrame и возвращающий Transformer. В свою очередь, Transformer должен иметь метод transform, который преобразует одну DataFrame в другую.

В курсе Scalable Machine Learning в одной из лабораторных работ преподаватели, рассказывая о линейной регрессии, решали задачу «об определении года создания песни по набору аудио-характеристик». В ней было реализовано довольно много методов как для обработки данных, так и для оценки и нахождения лучшей модели. Сделано это было, чтобы более детально ознакомить студентов с основными процессами в машинном обучении, но давайте проверим, насколько облегчит нам жизнь пакет spark.ml.

В лабораторной работе нам были предоставлены уже подготовленные и немного обрезанные данные. Но так как нам интересно пройти весь путь, то предлагаю взять сырой набор данных. Каждая строка вида:

2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719

где первым идёт год, далее 12 чисел это средние тембры, и последние 78 это ковариации тембров.

Первым делом нам нужно подтянуть эти данные в DataFrame, но сперва немного преобразуем формат данных:

  val sc = new SparkContext("local[*]", "YearPrediction")
  val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt")
    .map(_.split(','))
    .map(x => (
      x.head.toDouble,
      Vectors.dense(x.tail.take(12).map(_.toDouble)),
      Vectors.dense(x.takeRight(78).map(_.toDouble))
    ))


Теперь каждый элемент RDD это кортеж содержащий год и два вектора характеристик, чтобы получить DataFrame нужно выполнить ещё одно преобразование:

  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")


Обратите внимание, что мы создали sqlContext и подтянули методы неявного преобразования (в данном случае можно было написать import sqlContext.implicits.rddToDataFrameHolder) что-бы использовать метод toDF. Также мы указали имена колонок, и теперь структура данных будет выглядеть так:

  label | avg                                     | cov
 -------|-----------------------------------------|---------------------------------------------
  2001  | [49.94357 21.47114 73.07750 8.74861...  | [10.20556 611.10913 951.08960 698.11428...
 -------|-----------------------------------------|---------------------------------------------
  2007  | [50.57546 33.17843 50.53517 11.5521...  | [44.38997 2056.93836 605.40696 457.4117...


Градиентный метод, который используется в линейной регрессии, чувствителен к разбросу значений характеристик, поэтому данные перед обучением нужно нормировать или стандартизировать. Для этих целей в пакете spark.ml.feature есть два класса: StandardScaler и Normalizer.

  import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler}
  val scalerAvg: StandardScalerModel = new StandardScaler()
    .setWithMean(true)
    .setWithStd(true)
    .setInputCol("avg")
    .setOutputCol("features")
    // скармливаем наши сырые данные, чтобы алгоритм смог
    // посчитать статистику (среднее значение и стандартное отклонение)
    .fit(rawDF)

  val normAvg: Normalizer = new Normalizer()
    .setP(2.0)
    .setInputCol("avg")
    .setOutputCol("features")


Обратите внимание, что StandardScaler- это Estimator, а значит нам нужно вызвать метод fit, чтобы получить Transformer, в данном случае — StandardScalerModel. У всех классов, работающих с DataFrame, есть два общих метода:
setInputCol — задаём наименование колонки, с которой нужно считать данные
setOutputCol — указываем наименование колонки, в которую нужно записать преобразованные данные.

Отличия в результате работы этих классов в данном случае будет в том, что scaler вернёт данные в диапазоне от -1 до 1, а Normalizer в диапазоне от 0 до 1. Подробнее об алгоритмах работы можно почитать здесь и здесь.

Обучающую выборку мы подготовили (вернее получили модификаторы, которые мы будем применять для обработки данных), теперь нужно создать алгоритм оценки (Estimator), который на выходе даст нам обученную модель. Задаём почти стандартные настройки, на данном этапе они не особо интересны.

  import org.apache.spark.ml.regression.LinearRegression

  val linReg = new LinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("label")
    .setElasticNetParam(0.5)
    .setMaxIter(500)
    .setRegParam(1e-10)
    .setTol(1e-6)


Вот теперь у нас есть всё необходимое, чтобы собрать простенький конвейер:

  import org.apache.spark.ml.Pipeline

  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    linReg
  ))


У Pipeline есть метод setStages, принимающий массив шагов, которые будут выполнены в указанном порядке при поступлении обучающей выборки. Теперь, всё что нам осталось- это не забыть разделить данные на обучающую и тестовую выборку:

  val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache())
  val trainData = splitedData(0)
  val testData = splitedData(1)


Давайте запустим созданный нами конвейер и оценим результат его работы:

  val pipelineModel = pipeline.fit(trainData)
  val fullPredictions = pipelineModel.transform(testData)
  val predictions = fullPredictions.select("prediction").map(_.getDouble(0))
  val labels = fullPredictions.select("label").map(_.getDouble(0))
  val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError

  > (2003.0,1999.6153819348176)
    (1997.0,2000.9207184703566)
    (1996.0,2000.4171327880172)
    (1997.0,2002.022142263423)
    (2000.0,1997.6327888556184)
  RMSE: 10,552024


На этом этапе всё должно быть уже понятно, обратите внимание, что для оценки модели мы использовали готовый класс RegressionMetrics в котором, наряду с уже знакомой нам оценкой RMSE, реализованы также и другие базовые оценки.

Движемся дальше: в курсе Scalable Machine Learning мы создавали новые характеристики путём преобразования исходных в полином со степенью 2. Разработчики spark.ml позаботился и об этом: теперь нам достаточно создать ещё один модификатор и добавить его в конвейер; главное- в этом процессе не запутаться и правильно указать наименование колонок.

  import org.apache.spark.ml.feature.PolynomialExpansion

  // Создаём модификатор, который возьмёт данные из колонки "features" и созданный полином добавит в колонку "polyFeatures"
  val polynomAvg = new PolynomialExpansion()
    .setInputCol("features")
    .setOutputCol("polyFeatures")
    .setDegree(2)

  // Указываем алгоритму оценки из какой колонки брать характеристики
  linReg.setFeaturesCol("polyFeatures")

  // И добавляем новый модификатор в конвейер
  val pipeline = new Pipeline().setStages(Array(
    normAvg,
    polynomAvg,
    linReg
  ))

До сих пор мы использовали для обучения только 12 характеристик, но помнится, в сырых данных были ещё 78, может, попробуем объединить их? И на этот случай у spark.ml есть решение VectorAssembler. Раз решили, давайте сделаем:

  import org.apache.spark.ml.feature.VectorAssembler

  val assembler = new VectorAssembler()
    .setInputCols(Array("avg", "cov"))
    .setOutputCol("united")

  normAvg.setInputCol("united")

  val pipeline = new Pipeline().setStages(Array(
    assembler,
    normAvg,
    polynomAvg,
    linReg
  ))


С подготовкой данных мы немного разобрались, но остался вопрос подбора оптимальных параметров для алгоритма, уж очень не хочется делать это вручную, и не надо! Для этой цели в spark.ml реализован класс CrossValidator. CrossValidator принимает алгоритм оценки (в нашем случае это linReg), набор параметров которые мы хотели бы испытать и средство оценки (когда мы оценивали модель вручную, то использовали RMSE). CrossValidator начинает свою работу с того, что разбивает набор данных на несколько образцов (k по умолчанию 3), случайным образом выбирая обучающую и валидационную выборку (валидационная выборка будет составлять по размеру 1/k от исходной). Затем, для каждого набора параметров на каждом из образцов будет произведено обучение модели, оценка её эффективности и выбор лучшей модели. Надо отметить, что выбор модели через CrossValidator достаточно затратная по времени операция, но является статистически более обоснованным, чем эвристический ручной подбор.

Для удобства создания набора параметров в spark.ml есть класс-утилита ParamGridBuilder, его мы и используем:

  import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

  val paramGrid: Array[ParamMap] = new ParamGridBuilder()
    .addGrid(linReg.maxIter, Array(5, 500))
    .addGrid(linReg.regParam, Array(1e-15, 1e-10))
    .addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3))
    .addGrid(linReg.elasticNetParam, Array(0, 0.5, 1))
    .build()

  val crossVal = new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(new RegressionEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3)  

  val bestModel = crossVal.fit(trainData) 

  > Best set of parameters:
    {
          linReg_3a964d0300fd-elasticNetParam: 0.5,
          linReg_3a964d0300fd-maxIter: 500,
          linReg_3a964d0300fd-regParam: 1.0E-15,
          linReg_3a964d0300fd-tol: 1.0E-9
    }
    Best cross-validation metric: -10.47433119891316 


Ну вот наверное и всё, что касается линейной регрессии, для алгоритмов классификации и кластеризации в spark.ml также есть набор решений, готовых помочь удобно организовать рабочий процесс.

Используемые материалы:
Официальная документация
UCI Machine Learning Repository
Scalable Machine Learning

© Habrahabr.ru