Анализ поведенческих факторов с помощью Apache Spark

Речь пойдёт об использовании Apache Spark для анализа поведенческих факторов на сайте, который имеет очень большую посещаемость. Учёт поведенческих факторов весьма часто используется для повышения конверсии ресурса. Кроме этого, возможности Интернет позволяют очень просто и быстро собирать и анализировать гигантское количество самой разной статистической информации. Будут показаны примеры кода и даны некоторые советы, основанные на личном опыте автора статьи.
Поведенческие факторы — это один из самых эффективных методов выявления ценности документа. По сути, это переменные, на основании которых будет приниматься решение о рейтинге той или иной сущности. Если информации относительно немного, то её можно представить в удобном для человека графическом виде (гистограмма, матрица, тепловая карта, график регрессионного анализа или дендрограмма иерархического кластерного анализа). Например, в языке программирования R (он очень часто используется при анализе данных) достаточно удобно отображать графики (plot) или матрицы (mosaicplot, image, persp, contour). Кроме этого, в язык программирования R встроено очень много полезных функций (описательная статистика, комбинаторика, теория вероятностей). Я даже не говорю про библиотеки для более сложного анализа (например, Random forest).

Однако, самое главное ограничение асессора — это производительность. Человек не может оценить гигантское количество информации за небольшой промежуток времени. С другой стороны, есть широкий спектр задач, где невозможно обойтись без человека, так как некоторые вещи практически невозможно оценить алгоритмом.

Если информации настолько много, что она не может поместиться даже в память супер-компьютера (на одной ноде), то применяют распределённую обработку данных на нескольких станциях. Так, например, Apache Spark является одним из самых популярных фреймворков для распределённой обработки данных. Некоторые задачи можно выполнять буквально одной командой (строкой кода в консоли):

sc.textFile(path).map(s => (s, 1)).reduceByKey((a, b) => a + b).saveAsTextFile(pathSave)


Упомянутый фрагмент исходного кода на языке программирования Scala позволяет выполнить подсчёт событий (строк) в логе, где каждая строка — строго одно имя (уникальный идентификатор) события. В результате запуска этой программы мы увидим директорию с несколькими текстовыми файлами, содержащими результаты обработки данных. Отчёт о процессе обработки данных будет показан в удобном интерфейсе пользователя (http://127.0.0.1:4040/jobs/) с указанием подробной информации, включая визуальное отображение:
835ce526260c40108afefaedc8d8c48c.png
После того, как произойдёт загрузка данных из текстового файла (textFile), все строки будут записаны в виде специального распределённого набора данных RDD (Resilient Distributed Dataset). Теперь с полученными данными можно производить различные операции. Важно учесть, что трансформации (в оригинальной документации используется термин Transformations) не модифицируют множество (оно immutable), а создают новое. Более того, Spark не будет осуществлять операций с этим множеством до нужного момента (команды из списка Actions).

В данном конкретном случае набор данных будет трансформирован в карту (map применит действие к каждому элементу создав новый RDD). В качестве ключа будет строка, а в качестве значения будет проставлена единица. Другая трансформация (reduceByKey) — это сокращение элементов множества по ключу. Указанная формула (a + b) суммирует значения для каждого сокращаемого ключа. Таким образом, останутся только уникальные ключи, а в качестве значений у них будет количество повторений в изначальном множестве. Если есть желание ещё упростить приведённый пример, то вспомним про countByValue, который сделает упомянутую задачу совсем тривиальной.

А что, если мне нужно выполнить настоящий кластерный анализ методом KMeans? Для этого есть пакет spark.mllib, который содержит множество готовых алгоритмов (включая, но не ограничиваясь: кластеризацию, линейную регрессию, классификацию, коллаборативную фильтрацию, дерево принятия решений, random forests, gradient boosting).

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

val lines = sc.textFile(pathCsv)
val data = lines.map(s => Vectors.dense(s.split(";").map(_.toDouble))).cache()

// Число кластеров нужно задать заранее
val clusters = KMeans.train(data, 3, 20)
clusters.clusterCenters.mkString("\n") // Показать центроиды кластеров


Особо хочу отметить, что на точность работы алгоритма будет влиять фактор правильности выявления количества кластеров. Следовательно, данные лучше заранее почистить и проверить.

К одному из самых распространённых методов анализа поведенческих факторов можно отнести ассоциативные правила. Они очень часто применяются при нахождении шаблонных действий, например, типичной рыночной корзины в интернет-магазинах. Идея ассоциативных правил состоит в следующем: установить вероятность встречи в множестве одних элементов по факту присутствия других. Например, по наличию в корзине фонарика догадаться о наличии аккумулятора для него. Разумеется, речь идёт не только о покупках, а о любых других шаблонных действиях. Приведу пример такого анализа:

import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset

// Допустим, что это название конфет
val freqItemsets = sc.parallelize(Seq(
  new FreqItemset(Array("milk"), 31L),
  new FreqItemset(Array("red", "milk", "fantazia"), 84L),
  new FreqItemset(Array("milk", "fantazia"), 89L),
  new FreqItemset(Array("lemon"), 49L),
  new FreqItemset(Array("red", "milk", "lemon"), 14L),
  new FreqItemset(Array("green", "lemon"), 25L)
))

val results = new AssociationRules().setMinConfidence(0.5).run(freqItemsets)

results.collect().foreach { rule =>
  println(rule.antecedent.mkString(",") + " -> "
    + rule.consequent.mkString(",") + " // " + rule.confidence)
}


К задачам анализа поведенческих факторов также относят различные составляющие активности людей в социальных сетях. Для таких задач удобнее представить данные в виде графа. Благо, есть компонент под названием GraphX. Смысл этого компонента состоит в упрощении распределённой обработки графов. Его работу можно показать на примере выявления PageRank для сайтов. Прежде всего, зададим узлы и рёбра графа, а потом выполним с ними несколько самых элементарных операций:

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val v = Array(
    (1L, ("www.1.com", 10)),
    (2L, ("www.2.com", 20)),
    (3L, ("www.3.com", 30)),
    (4L, ("www.4.com", 40)),
    (5L, ("www.5.com", 50)),
    (6L, ("www.6.com", 0))
)

val e = Array(
    Edge(1L, 5L, 1),
    Edge(2L, 5L, 2),
    Edge(3L, 5L, 3),
    Edge(4L, 5L, 4),
    Edge(5L, 1L, 5)
)

// Преобразуем в граф
val graph: Graph[(String, Int), Int] = Graph(sc.parallelize(v), sc.parallelize(e))

// Посмотрим только те вершины, которые подпадают под нужные условия
graph.vertices.filter{ case (id, (url, visits)) => visits > 35 }.collect().mkString("\n")

// Какой сайт куда ссылается
for (triplet <- graph.triplets.collect()) {
    print(s"Link from ${triplet.srcAttr._1} (visits = ${triplet.srcAttr._2}) ")
    println(s"to ${triplet.dstAttr._1} (visits = ${triplet.dstAttr._2})")
}

// Кстати, есть встроенный алгоритм расчёта PageRank
graph.pageRank(0.001, 0.4).vertices.collect().foreach {
    site => println("Site Id = " + site._1 + ", PR = " + site._2)
}


У фреймворка Apache Spark есть API для популярных языков программирования (Scala, Python и Java). Встречается мнение, что на языке программирования Java код получается немного громоздким. Кроме этого, для выполнения относительно простых задач не очень хочется писать приложение на Java. Но при необходимости можно и на Java. Подключение к приложению не должно вызывать особых затруднений. Кроме импорта библиотек ничего не понадобилось:

public class Run {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local").setAppName("HABR");
        JavaSparkContext ctx = new JavaSparkContext(conf);

        /**
        * Пример №1. Гипоциклоида
        */
        JavaRDD points = ctx.textFile(path).map(new Function() {
            public Point call(String line) { 
                Double t = Double.parseDouble(line);
                Double x = 5.5 * (Math.cos(t) + Math.cos(1.1 * t) / 1.1);
                Double y = 5.5 * (Math.sin(t) - Math.sin(1.1 * t) / 1.1);
                return new Point(x, y);
            }
        });
        points.saveAsTextFile(savePath);

        /**
        * Пример №2. Работа с SQLContext
        */
        String sql = "SELECT * FROM sites WHERE type = 'b' AND id IN (1,2)";

        SQLContext sqlContext = new SQLContext(ctx);
        DataFrame sites = sqlContext.read().json(jsonPath);
        sites.show();

        sites.registerTempTable("sites");
        DataFrame results = sqlContext.sql(sql);
        results.show();
    }
}


Что касается самой логики расчёта рейтинга, то она очень сильно зависит от проекта. Общие показатели (просмотры, отказы, время на сайте, глубина просмотра, повторные визиты) не всегда более информативны, чем специально отслеживаемые события. По моему личному опыту обычно шли достаточно банальным путём. В момент подготовки данных (сайт написан на Yii 2, а за его наполнение отвечает сервис на Java) для наполнения базы данных производилась обработка всех документов. На этапе этой обработки вычислялся рейтинг документа, а в базу записывался готовый результат.

Как вы понимаете, выполнять расчёты динамически (при каждом обновлении страницы) нет прямой необходимости. Более того, на сайтах с огромной нагрузкой не поможет даже кэширование (Memcached, Redis, Tarantool). Следовательно, необходимо заранее выполнять все расчёты, а приложение на PHP должно использоваться только для отображения (модель получает уже посчитанный рейтинг из базы данных и с помощью контроллера отображает в представлении). Кстати, со стороны сайта (в его базе данных) рейтинг лучше хранить целым числом. Ещё один очень простой схематический пример для наглядности.

Создадим миграцию для Yii 2:

use yii\db\Migration;

class m160401_134629_doc extends Migration
{
    public function up()
    {
        $this->createTable('{{%doc}}', [
            'id' => $this->primaryKey(),
            'name' => $this->string(150)->notNull()->unique(),
            'content' => $this->text()->notNull(),
            'rating' => $this->integer()->notNull()->defaultValue(0),
            'created_at' => $this->integer()->notNull(),
            'updated_at' => $this->integer()->notNull(),
        ], 'CHARACTER SET utf8 COLLATE utf8_unicode_ci ENGINE=InnoDB');
    }

    public function down()
    {
        $this->dropTable('{{%doc}}');
    }
}


А следом за ней и модель:

namespace common\models;
use Yii;

/**
 * This is the model class for table "{{%doc}}".
 *
 * @property integer $id
 * @property string $name
 * @property string $content
 * @property integer $rating
 * @property integer $created_at
 * @property integer $updated_at
 */
class Doc extends \yii\db\ActiveRecord
{
    /**
     * @inheritdoc
     */
    public static function tableName()
    {
        return '{{%doc}}';
    }

    /**
     * @inheritdoc
     */
    public function rules()
    {
        return [
            [['name', 'content', 'created_at', 'updated_at'], 'required'],
            [['content'], 'string'],
            [['rating', 'created_at', 'updated_at'], 'integer'],
            [['name'], 'string', 'max' => 150],
            [['name'], 'unique'],
        ];
    }

    /**
     * @inheritdoc
     */
    public function attributeLabels()
    {
        return [
            'id' => 'ID',
            'name' => 'Name',
            'content' => 'Content',
            'rating' => 'Rating',
            'created_at' => 'Created At',
            'updated_at' => 'Updated At',
        ];
    }

    /**
     * @param integer $limit
     * @return Doc[]
     */
    public static function getDocs($limit = 10)
    {
        return static::find()->orderBy('rating DESC')->limit($limit)->all();
    }
}


Естественно, в реальном проекте уже был готовый функционал, который выводил отсортированный по рейтингу список неких сущностей. Продуманы индексы, которые я не описал в примере. А вот простой контроллер был похож на показанный далее:

namespace frontend\controllers;

use Yii;
use common\models\Doc;
use yii\web\Controller;

class DocController extends Controller
{
    /**
     * Рейтинг документов (ТОП-10)
     *
     * @return string
     */
    public function actionIndex()
    {
        $limit = 10;
        $cacheTime = 60;
        
        $docs = Doc::getDb()->cache(function ($db) use ($limit) {
            return Doc::getDocs($limit);
        }, $cacheTime);

        if(empty($docs)) {
            $this->goHome();
        }
        
        return $this->render('index', ['docs' => $docs]);
    }
}


И сложностей в изменении кода на сайте не возникало. Собственно, а почему они должны возникнуть? Основную работу делает не сайт: выгрузка данных на сайте будет содержать предварительно обработанное с помощью Spark поле рейтинга. По секрету говоря, в реальном проекте был использован язык программирования Scala для написания очень компактного кода для Spark.

Разумеется, если задача простая, то нет смысла реализовывать очень сложные алгоритмы. Для подготовки умеренного количества данных получится использовать весьма простой сервис. Посмотрим схематический прототип на Java без использования сторонних библиотек. Интерфейс просит нас передать методу объект документа, а вернуть — число (Double), отражающее уровень рейтинга.

public interface IRating {
        Double rating(Doc doc);
}


Соответственно, добавим несколько классов, которые по-разному рассчитывают рейтинг. Во избежание искусственного усложнения и выхода за рамки этой статьи добавим логическое ограничение: пусть на сайте будут два типа рейтингов документа. Для первого типа рейтинг находим по количеству просмотров (посещений) документа:

public class MainRating implements IRating {

        @Override
        public Double rating(Doc doc) {
                return (Math.log(doc.getVisits()) * 2);
        }
}


А для второго вычислим по иной вымышленной формуле:

public class AdditionalRating implements IRating {

        @Override
        public Double rating(Doc doc) {
                return (doc.getEvents() * 0.5) + Math.random();
        }
}


Далее мы создадим абстрактный класс для будущей фабрики:

public abstract class IRatingFactory {
        public abstract IRating getRankType(String type);
}


Про саму фабрику тоже не будем забывать:

public class RatingFactory extends IRatingFactory {

        @Override
        public IRating getRankType(String type) {
                switch (type) {
                        case "main":
                                return new MainRating();
                        case "additional":
                                return new AdditionalRating();
                        default:
                                return null;
                }
        }
}


Очень не хочу прибегать к оценочным суждениям. Однако, у меня сложилось весьма хорошее впечатление об Apache Spark. При создании очень высокопроизводительной статистической системы он сыграл важную роль. Более того, его применение в «продакшен» меня приятно удивило эффективностью и простотой. Но это моё личное мнение, которым я хочу просто поделиться, а не навязывать. Во всяком случае, достаточно подробная документация и наличие нескольких англоязычных книг помогают быстро изучить его основные возможности.

У меня на протяжении нескольких лет было желаний найти некий почти идеальный инструмент. Честно говоря, мне понадобилось много времени, чтобы избавиться от этой наивной идеи. Сейчас в моих убеждениях есть другая идея — «волшебной таблетки» быть не может. Эффективность каждого инструмента напрямую зависит от конкретной ситуации. На эту тему уже написано масса книг и статей, но за годы ежедневного чтения книг по упомянутой тематике я проникаюсь только одной мыслью — очень редко удаётся сразу спроектировать громадную статистическую систему или веб-ресурс. Чаще это эволюционный процесс, за время развития которого «рождаются» новые модули с новыми современными технологиями и уходят в историю не прошедшие естественный отбор «жертвы рефакторинга». Мир не статичен, разве нет? Собственно, хочу пожелать вашим системам постоянного и конструктивного развития.

© Habrahabr.ru