Анализ данных на Scala. Считаем корреляцию 21-го века

011469e830514f2584d20474bc88a8e5.pngОчень важно выбрать правильный инструмент для анализа данных. На форумах Kaggle.com, где проводятся международные соревнования по Data Science, часто спрашивают, какой инструмент лучше. Первые строчки популярноcти занимают R и Python. В статье мы расскажем про альтернативный стек технологий анализа данных, сделанный на основе языка программирования Scala и платформы распределенных вычислений Spark.Как мы пришли к этому? В Retail Rocket мы много занимаемся машинным обучением на очень больших массивах данных. Раньше для разработки прототипов мы использовали связку IPython + Pyhs2 (hive драйвер для Python) + Pandas + Sklearn. В конце лета 2014 года приняли принципиальное решение перейти на Spark, так как эксперименты показали, что мы получим 3–4 кратное повышение производительности на том же парке серверов.

Еще один плюс — мы можем использовать один язык программирования для моделирования и кода, который будет работать на боевых серверах. Для нас это было большим преимуществом, так как до этого мы использовали 4 языка одновременно: Hive, Pig, Java, Python, для небольшой команды это серьезная проблема.

Spark хорошо поддерживает работу с Python/Scala/Java через API. Мы решили выбрать Scala, так как именно на нем написан Spark, то есть можно анализировать его исходный код и при необходимости исправлять ошибки, плюс — это JVM, на котором крутится весь Hadoop. Анализ форумов по языкам программирования под Spark свел к следующему:

Scala:+ функциональный;+ родной для Spark;+ работает на JVM, а значит родной для Hadoop;+ строгая статическая типизация; — довольно сложный вход, но код читабельный.

Python:+ популярный;+ простой; — динамическая типизация; — производительность хуже, чем у Scala.

Java:+ популярность;+ родной для Hadoop; — слишком много кода.

Более подробно по выбору языка программирования для Spark можно прочитать здесь.

Должен сказать, что выбор дался не просто, так как Scala никто в команде на тот момент не знал.Известный факт: чтобы научиться хорошо общаться на языке, нужно погрузиться в языковую среду и использовать его как можно чаще. Поэтому для моделирования и быстрого анализа данных мы отказались от питоновского стека в пользу Scala.

В первую очередь нужно было найти замену IPython, варианты были следующие:1) Zeppelin — an IPython-like notebook for Spark;2) ISpark;3) Spark Notebook;4) Spark IPython Notebook от IBM.

Пока выбор пал на ISpark, так как он простой, — это IPython для Scala/Spark, к нему относительно легко удалось прикрутить графики HighCharts и R. И у нас не возникло проблем с подключением его к Yarn-кластеру.

Наш рассказ о среде анализа данных на Scala состоит из трех частей:1) Несложная задача на Scala в ISpark, которая будет выполняться локально на Spark.2) Настройка и установка компонент для работы в ISpark.3) Пишем Machine Learning задачу на Scala, используя библиотеки R.И если эта статья будет популярной, я напишу две другие. ;)

ЗадачаДавайте попробуем ответить на вопрос: зависит ли средний чек покупки в интернет-магазине от статичных параметров клиента, которые включают в себя населенный пункт, тип браузера (мобильный/Desktop), операционную систему и версию браузера? Сделать это можно с помощью «Взаимной информации» (Mutual Information). В Retail Rocket мы много где используем энтропию для наших рекомендательных алгоритмов и анализа: классическую формулу Шеннона, расхождение Кульбака-Лейблера, взаимную информацию. Мы даже подали заявку на доклад на конференцию RecSys по этой теме. Этим мерам посвящен отдельный, хоть и небольшой раздел в известном учебнике по машинному обучению Мерфи.

Проведем анализ на реальных данных Retail Rocket. Предварительно я скопировал выборку из нашего кластера к себе на компьютер в виде csv-файла.Загрузка данных Здесь мы используем ISpark и Spark, запущенный в локальном режиме, то есть все вычисления происходят локально, распределение идет по ядрам. Собственно в комментариях все написано. Самое главное, что на выходе мы получаем RDD (структура данных Spark), которая представляет собой коллекцию кейс-классов типа Row, который определен в коде. Это позволит обращаться к полям через ».», например _.categoryId.На входе:

import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.tribbloid.ispark.display.dsl._ import scala.util.Try

val sqlContext = new org.apache.spark.sql.SQLContext (sc) import sqlContext.implicits._

// Объявляем CASE class, он нам понадобится для dataframe case class Row (categoryId: Long, orderId: String, cityId: String, osName: String, osFamily: String, uaType: String, uaName: String, aov: Double)

// читаем файл в переменную val с помощью sc (Spark Context), его объявляет Ipython заранее val aov = sc.textFile («file:///Users/rzykov/Downloads/AOVC.csv»)

// парсим поля val dataAov = aov.flatMap { line => Try { line.split (»,») match { case Array (categoryId, orderId, cityId, osName, osFamily, uaType, uaName, aov) => Row (categoryId.toLong + 100, orderId, cityId, osName, osFamily, osFamily, uaType, aov.toDouble) } }.toOption } На выходе: MapPartitionsRDD[4] at map at :28 Теперь посмотрим на сами данные: d091a174203a4fdfaae9bace759e2bab.pngВ данной строке используется новый тип данных DataFrame, добавленный в Spark в версии 1.3.0, он очень похож на аналогичную структуру в библиотеке pandas в Python. toDf подхватывает наш кейс-класс Row, благодаря чему получает названия полей и их типы.Для дальнейшего анализа нужно выбрать какую-нибудь одну категорию желательно с большим количеством данных. Для этого нужно получить список наиболее популярных категорий.

На входе:

//Наиболее популярная категория dataAov.map { x => x.categoryId } // выбираем поле categoryId .countByValue () // рассчитываем частоту появления каждой categoryId .toSeq .sortBy ( — _._2) // делаем сортировку по частоте по убыванию .take (10) // берем ТОП 10 записей На выходе мы получили массив кортежей (tuple) в формате (categoryId, частота): ArrayBuffer ((314,3068), (132,2229), (128,1770), (270,1483), (139,1379), (107,1366), (177,1311), (226,1268), (103,1259), (127,1204)) Для дальнейшей работы я решил выбрать 128-ю категорию.Подготовим данные: отфильтруем нужные типы операционных систем, чтобы не засорять графики мусором.

На входе:

val interestedBrowsers = List («Android», «OS X», «iOS», «Linux», «Windows») val osAov = dataAov.filter (x => interestedBrowsers.contains (x.osFamily)) //оставляем только нужные ОС .filter (_.categoryId == 128) // фильтруем категории .map (x => (x.osFamily, (x.aov, 1.0))) // нужно для расчета среднего чека .reduceByKey ((x, y) => (x._1 + y._1, x._2 + y._2)) .map{ case (osFamily, (revenue, orders)) => (osFamily, revenue/orders) } .collect () На выходе массив кортежей (tuple) в формате OS, средний чек: Array ((OS X,4859.827586206897), (Linux,3730.4347826086955), (iOS,3964.6153846153848), (Android,3670.8474576271187), (Windows,3261.030993042378)) Хочется визуализации, давайте сделаем это в HighCharts: 9cc4ed693a754df8be25b077351bf239.pngТеоретически можно использовать любые графики HighCharts, если они поддерживаются в Wisp. Все графики интерактивны.Попробуем сделать то же самое, но через R.Запускаем R клиент:

import org.ddahl.rscala._ import ru.retailrocket.ispark._

def connect () = RClient («R», false) @transient val r = connect () Строим сам график: a9f9c006072842f3a775370a4aa89e32.pngТак можно строить любые графики R прямо в блокноте IPython.Взаимная информация На графиках видно, что зависимость есть, но подтвердят ли нам этот вывод метрики? Существует множество способов это сделать. В нашем случае мы используем взаимную информацию (Mutual Information) между величинами в таблице. Она измеряет взаимную зависимость между распределениями двух случайных (дискретных) величин.Для дискретных распределений она рассчитывается по формуле:

e1f52fb3459543079e9b8d7052230f97.png

Но нас интересует более практичная метрика: Maximal Information Coefficient (MIC), для расчета которой для непрерывных переменных приходится идти на хитрости. Вот как звучит определение этого параметра.

Пусть D = (x, y) — это набор из n упорядоченных пар элементов случайных величин X и Y. Это двумерное пространство разбивается X и Y сетками, группируя значения x и y в X и Y разбиения соответственно (вспомните гистограммы!).

d4e4bd652dbc4a9d8d91ff10e88313d9.png

где B (n) — это размер сетки, I∗(D, X, Y) — это взаимная информация по разбиению X и Y. В знаменателе указан логарифм, который служит для нормализации MIC в значения отрезка [0, 1]. MIC принимает непрерывные значения в отрезке [0,1]: для крайних значений равен 1, если зависимость есть, 0 — если ее нет. Что можно еще почитать по этой теме перечислено в конце статьи, в списке литературы.

В книге MIC (взаимная информация) названа корреляцией 21-го века. И вот почему! На графике ниже приведены 6 зависимостей (графики С — H). Для них были вычислены корреляция Пирсона и MIC, они отмечены соответствующими буквами на графике слева. Как мы видим, корреляция Пирсона практически равна нулю, в то время как MIC показывает зависимость (графики F, G, E).289e0746bc8e4164b3c96891b008841d.pngПервоисточник: people.cs.ubc.caВ таблице ниже приведен ряд метрик, которые были вычислены на разных зависимостях: случайной, линейной, кубической и т.д. Из таблицы видно, что MIC ведет себя очень хорошо, обнаруживая нелинейные зависимости: 4d34c74698f5420dab3e81cb8d0748f0.png

Еще один интересный график иллюстрирует воздействие шумов на MIC: 52d7a8cd9e5047f08c1ac07fee85a159.png

В нашем случае мы имеем дело с расчетом MIC, когда переменная Aov у нас непрерывная, а все остальные дискретны с неупорядоченными значениями, например тип браузера. Для корректного расчета MIC понадобится дискретизация переменной Aov. Мы воспользуемся готовым решением с сайта exploredata.net. Есть с этим решением одна проблема: она считает, что обе переменные непрерывны и выражены в значениях Float. Поэтому нам придется обмануть код, кодируя значения дискретных величин во Float и случайно меняя порядок этих величин. Для этого придется сделать много итераций со случайным порядком (мы сделаем 100), а в качестве результата возьмем максимальное значение MIC.

import data.VarPairData import mine.core.MineParameters import analysis.Analysis import analysis.results.BriefResult import scala.util.Random

//Кодируем дискретную величину, случайно изменяя порядок «кодов» def encode (col: Array[String]): Array[Double] = {

val ns = scala.util.Random.shuffle (1 to col.toSet.size) val encMap = col.toSet.zip (ns).toMap col.map{encMap (_).toDouble} }

// функция вычисления MIC def mic (x: Array[Double], y: Array[Double]) = { val data = new VarPairData (x.map (_.toFloat), y.map (_.toFloat)) val params = new MineParameters (0.6.toFloat, 15, 0, null)

val res = Analysis.getResult (classOf[BriefResult], data, params) res.getMIC }

//в случае дискретной величины делаем много итераций и берем максимум def micMax (x: Array[Double], y: Array[Double], n: Int = 100) = (for{ i <- 1 to 100} yield mic(x, y)).max Ну вот мы близки к финалу, теперь осуществим сам расчет: val aov = dataAov.filter(x => interestedBrowsers.contains (x.osFamily)) //оставляем только нужные ОС .filter (_.categoryId == 128) // фильтруем категории

//osFamily var aovMic = aov.map (x => (x.osFamily, x.aov)).collect () println («osFamily MIC =» + micMax (encode (aovMic.map (_._1)), aovMic.map (_._2)))

//orderId

aovMic = aov.map (x => (x.orderId, x.aov)).collect () println («orderId MIC =» + micMax (encode (aovMic.map (_._1)), aovMic.map (_._2)))

//cityId aovMic = aov.map (x => (x.cityId, x.aov)).collect () println («cityId MIC =» + micMax (encode (aovMic.map (_._1)), aovMic.map (_._2)))

//uaName aovMic = aov.map (x => (x.uaName, x.aov)).collect () println («uaName MIC =» + mic (encode (aovMic.map (_._1)), aovMic.map (_._2)))

//aov println («aov MIC =» + micMax (aovMic.map (_._2), aovMic.map (_._2)))

//random println («random MIC =» + mic (aovMic.map (_ => math.random*100.0), aovMic.map (_._2))) На выходе: osFamily MIC =0.06658 orderId MIC =0.10074 cityId MIC =0.07281 aov MIC =0.99999 uaName MIC =0.05297 random MIC =0.10599 Для эксперимента я добавил случайную величину с равномерным распределением и сам AOV.Как мы видим, практически все MIC оказались ниже случайной величины (random MIC), что можно считать «условным» порогом принятия решения. Aov MIC равен практически единице, что естественно, так как корреляция самой к себе равна 1.Возникает интересный вопрос: почему мы на графиках видим зависимость, а MIC нулевой? Можно придумать множество гипотез, но скорее всего для случая os Family все довольно просто — количество машин с Windows намного превышает количество остальных: 2340ed847d544435ab9ca45ab1db43bc.png

Заключение Надеюсь, что Scala получит свою популярность среди аналитиков данных (Data Scientists). Это очень удобно, так как есть возможность работать со стандартным IPython notebook + получить все возможности Spark. Этот код может спокойно работать с терабайтными массивами данных, для этого нужно просто изменить строчку конфигурации в ISpark, указав URI вашего кластера.Кстати, у нас открыты вакансии по этому направлению:

Полезные ссылки: Научная статья, на базе которой разрабатывался MIC.Заметка на KDnuggets про взаимную информацию (есть видео).Библиотека на C для расчета MIC с обертками для Python и MATLAB/OCTAVE.Сайт автора научной статьи, который разработал MIC (на сайте есть модуль для R и библиотека на Java).

© Habrahabr.ru