Диаграмма Сэнкей (Sankey diagram) на Python

Я занимаюсь аналитикой данных в Aliradar. Мы не представлены на Хабре, но у меня поднакопился материал, которым хотелось бы поделиться. Написать эту статью меня сподвигло отсутствие годных гайдов по построению диаграммы Сэнкей с использованием python на русском языке.

В моей работе часто возникают различные задачи по анализу консистентности и полноты данных, а также по визуализации. Одна из таких задач, которую решал относительно недавно — необходимость визуализировать действия пользователей нашего мобильного приложения. Нужно было понять, какие сценарии работы с приложением существуют и внимательнее рассмотреть действия пользователей на каждом шаге для дальнейшего улучшения стабильности работы приложения.

Так как пользователей у нас много, то анализировать действия каждого — трудная и дорогая задача. Поэтому было решено визуализировать события пользователей, используя диаграмму Сэнкей (Sankey diagram).

Забегая вперед, покажу, что получится в итоге. Для подготовки данных и построения диаграммы использовал python, pandas и plotly. Надеюсь, что эта статья будет полезна аналитикам данных, код можно запустить в colab, либо взять в репозитории на github.

А теперь разберем пошагово.

Что это такое?

Первая публикация этой диаграммы появилась в 1898 году. Ее создатель, Мэтью Сенкей (Matthew H. Sankey), показал сравнение парового двигателя и двигателя без энергопотерь.

Тепловая эффективность парового двигателяТепловая эффективность парового двигателя

Определение, что такое диаграмма Сэнкей для действий пользователей, может быть следующее — это визуализация потоков от одного целевого действия пользователя к другому. Вот упрощенная схема, того, что в итоге должно получиться:

Упрощенная схема диаграммы СэнкейУпрощенная схема диаграммы Сэнкей

Разберем эту схему:

  • есть событие event_1, которое происходит раньше других и находится левее других на схеме. Такое событие будем считать источником (source);

  • далее происходит «переход» уникальных пользователей от event_1 (source) к событиям event_1, event_2, event_3, которые будем считать на первом шаге (step_1) целевыми действиями (target). Количество уникальных пользователей, совершивших переход от source к target показано с помощью ширины канала между source и target;

  • на шаге step_2 уже event_1, event_2, event_3 будут являться источниками, а event_3 и event_4 целевыми действиями;

  • от шага к шагу выполняются подобные изменения source на target. Самое первое действие — это только source, а последнее — это target, так как в первый source нет входящих потоков, а из последнего таргета нет исходящих.

Эта схема — упрощение, так как на ней указаны только один первоначальный source, и один заключительный target. В реальной жизни source и target на каждом шаге, как и самих шагов, может быть сколько угодно.

Подготовка данных

В качестве исходных данных для построения я сгенерировал искусственные данные.

Загрузка подготовленных данных
PATH_TO_CSV = 'https://raw.githubusercontent.com/rusantsovsv/senkey_tutorial/main/csv/senkey_data_tutorial.csv'

# подгружаем данные в таблицу и выводим первые 5 строк
table = pd.read_csv(PATH_TO_CSV)
table.head()

Первые 5 строк исходной таблицы имеют следующий вид:

bf7fe6da849223ead9b18d56aaefba3c.png

В этой таблице:

  • user_id — сгенерированный id пользователя;

  • event_timestamp — время события;

  • event_name — имя события.

Для построения диаграммы нужно определить пары source-target, а также пронумеровать эти пары в соответствии с временем наступления события — это будет шаг между событиями.

Преобразование исходной таблицы
def add_features(df):
    
    """Функция генерации новых столбцов для исходной таблицы

    Args:
        df (pd.DataFrame): исходная таблица.
    Returns:
        pd.DataFrame: таблица с новыми признаками.
    """
    
    # сортируем по id и времени
    sorted_df = df.sort_values(by=['user_id', 'event_timestamp']).copy()
    # добавляем шаги событий
    sorted_df['step'] = sorted_df.groupby('user_id').cumcount() + 1
    
    # добавляем узлы-источники и целевые узлы
    # узлы-источники - это сами события
    sorted_df['source'] = sorted_df['event_name']
    # добавляем целевые узлы
    sorted_df['target'] = sorted_df.groupby('user_id')['source'].shift(-1)
    
    # возврат таблицы без имени событий
    return sorted_df.drop(['event_name'], axis=1)
  
# преобразуем таблицу
table = add_features(table)
table.head()

Первые 5 строк таблицы после преобразования:

3d2d6ee9916ef5c1269c86e45650dca7.png

Что получили в итоговой таблице:

  • события каждого id отсортированы по времени;

  • созданы пары событий source — target;

  • добавлен шаг между этими событиями для построения диаграммы;

  • удален столбец event_name, так как в дальнейших преобразованиях он использоваться не будет.

Следующее, что нужно сделать — это выбрать количество шагов на нашей будущей диаграмме. Чем больше шагов, тем больше графических объектов в итоге будет отображено, но так как это пример, ограничимся количеством шагов, например, равным 7.

Ограничение количества шагов до 7
# удалим все пары source-target, шаг которых превышает 7
# и сохраним полученную таблицу в отдельную переменную
df_comp = table[table['step'] <= 7].copy().reset_index(drop=True)

Создание индексов для source

Важным следующим шагом в подготовке данных является создание индексов для source. На каждом следующем шаге target становится source, и чтобы диаграмма коррректно генерировалась нужна правильная индексация source на каждом шаге.

Создадим словарь, в котором ключи — это шаги, а значения — словари со списком названий source и соответствующих им индексов. Обратите внимание, что на следующем шаге индексы source продолжают нумерацию, а не начинают с 0, при том, что имена событий могут повторяться.

Затем для каждого шага объединяем имена и индексы в еще один вложенный словарь. Все вложенные списки и словари потребуются в дальнейшем для генерации меток, подписей и размера каналов между source и target.

Создание словаря с индексами source
def get_source_index(df):
    
    """Функция генерации индексов source

    Args:
        df (pd.DataFrame): исходная таблица с признаками step, source, target.
    Returns:
        dict: словарь с индексами, именами и соответсвиями индексов именам source.
    """
    
    res_dict = {}
    
    count = 0
    # получаем индексы источников
    for no, step in enumerate(df['step'].unique().tolist()):
        # получаем уникальные наименования для шага
        res_dict[no+1] = {}
        res_dict[no+1]['sources'] = df[df['step'] == step]['source'].unique().tolist()
        res_dict[no+1]['sources_index'] = []
        for i in range(len(res_dict[no+1]['sources'])):
            res_dict[no+1]['sources_index'].append(count)
            count += 1
            
    # соединим списки
    for key in res_dict:
        res_dict[key]['sources_dict'] = {}
        for name, no in zip(res_dict[key]['sources'], res_dict[key]['sources_index']):
            res_dict[key]['sources_dict'][name] = no
    return res_dict
  

# создаем словарь
source_indexes = get_source_index(df_comp)
Пример записи в словаре для шага 2
sources
 ['history_opened', 'app_opened_from_market', 'sales_category_selected', 'favorites_opened', 'item_opened', 'app_opened_via_icon', 'market_opened_without_referral', 'price_history_opened', 'search_tab_opened', 'seller_info_opened', 'item_loaded_from_store', 'marketApp_opened', 'chart_click', 'item_opened_from_history', 'similar_tab_opened', 'reviews_tab_opened', 'app_remove', 'similar_item_opened', 'marketApp_opened_from_item', 'sales_item_opened_from_main', 'auth_opened', 'search_request_entered', 'item_info_click', 'sales_opened', 'settings_opened', 'similars_not_fetched_from_server', 'auth_user_succeeded', 'search_results_loaded'] 

sources_index
 [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47] 

sources_dict
 {'history_opened': 20, 'app_opened_from_market': 21, 'sales_category_selected': 22, 'favorites_opened': 23, 'item_opened': 24, 'app_opened_via_icon': 25, 'market_opened_without_referral': 26, 'price_history_opened': 27, 'search_tab_opened': 28, 'seller_info_opened': 29, 'item_loaded_from_store': 30, 'marketApp_opened': 31, 'chart_click': 32, 'item_opened_from_history': 33, 'similar_tab_opened': 34, 'reviews_tab_opened': 35, 'app_remove': 36, 'similar_item_opened': 37, 'marketApp_opened_from_item': 38, 'sales_item_opened_from_main': 39, 'auth_opened': 40, 'search_request_entered': 41, 'item_info_click': 42, 'sales_opened': 43, 'settings_opened': 44, 'similars_not_fetched_from_server': 45, 'auth_user_succeeded': 46, 'search_results_loaded': 47}

Генерация цветов для source

Для более наглядного представления можно разукрасить каждый source-target в разные цвета. Я рассмотрел 2 способа — случайная генерация и ручной выбор цветов.

Цвета выберем в цветовой модели RGBA. Это необходимо, чтобы сделать каналы source-target более прозрачными, по отношению к блокам для лучшей читаемости схемы.

Цвет будем генерировать для каждого уникального источника. Для этого создадим еще один словарь, в котором будут храниться соответствия source: color. По личному субъективному мнению, автоматически сгенерированные цвета не очень нравятся. Поэтому потратив немного времени выбрал те цвета, которые интереснее выглядят на белом фоне. Их можно загрузить, указав в функции colors_for_sources значение mode='custom' ('random' для случайной генерации цвета).

Функция случайной генерации цветов
def generate_random_color():
    
    """Случайная генерация цветов rgba

    Args:
        
    Returns:
        str: Строка со сгенерированными параметрами цвета
    """
    
    # сгенерим значение для каждого канала
    r, g, b = np.random.randint(255, size=3)
    return f'rgba({r}, {g}, {b}, 1)'
Создание словаря с соответствиями source: color
def colors_for_sources(mode):
    
    """Генерация цветов rgba

    Args:
        mode (str): сгенерировать случайные цвета, если 'random', а если 'custom' - 
                    использовать заранее подготовленные
    Returns:
        dict: словарь с цветами, соответствующими каждому индексу
    """
    # словарь, в который сложим цвета в соответствии с индексом
    colors_dict = {}
    
    if mode == 'random':
        # генерим случайные цвета
        for label in df_comp['source'].unique():
            r, g, b = np.random.randint(255, size=3)            
            colors_dict[label] = f'rgba({r}, {g}, {b}, 1)'
            
    elif mode == 'custom':
        # присваиваем ранее подготовленные цвета
        colors = requests.get('https://raw.githubusercontent.com/rusantsovsv/senkey_tutorial/main/json/colors_senkey.json').json()
        for no, label in enumerate(df_comp['source'].unique()):
            colors_dict[label] = colors['custom_colors'][no]
            
    return colors_dict
  
  
# генерю цвета из своего списка
colors_dict = colors_for_sources(mode='custom')

Создаем словарь с данными

Диаграмму будем отрисовывать с помощью Plotly. Для корректной (и более полной) отрисовки нужны следующие данные:

  • sources — список с индексами source;

  • targets — список с индексами target;

  • values — количество уникальных пользователей, совершивших переход между узлами source-target («объем» потока между узлами);

  • labels — названия узлов;

  • colors_labels — цвет узлов;

  • link_color — цвет потоков между узлами;

  • link_text — дополнительная информация.

Следующие 2 функции помогут создать словарь этих списков:

Расчет количества уникальных пользователей в процентах
def percent_users(sources, targets, values):
    
    """
    Расчет уникальных id в процентах (для вывода в hover text каждого узла)
    
    Args:
        sources (list): список с индексами source.
        targets (list): список с индексами target.
        values (list): список с "объемами" потоков.
        
    Returns:
        list: список с "объемами" потоков в процентах
    """
    
    # объединим источники и метки и найдем пары
    zip_lists = list(zip(sources, targets, values))
    
    new_list = []
    
    # подготовим список словарь с общим объемом трафика в узлах
    unique_dict = {}
    
    # проходим по каждому узлу
    for source, target, value in zip_lists:
        if source not in unique_dict:
            # находим все источники и считаем общий трафик
            unique_dict[source] = 0
            for sr, tg, vl in zip_lists:
                if sr == source:
                    unique_dict[source] += vl
                    
    # считаем проценты
    for source, target, value in zip_lists:
        new_list.append(round(100 * value / unique_dict[source], 1))
    
    return new_list
Создание словаря с данными для отрисовки диаграммы
def lists_for_plot(source_indexes=source_indexes, colors=colors_dict, frac=10):
    
    """
    Создаем необходимые для отрисовки диаграммы переменные списков и возвращаем
    их в виде словаря
    
    Args:
        source_indexes (dict): словарь с именами и индексами source.
        colors (dict): словарь с цветами source.
        frac (int): ограничение на минимальный "объем" между узлами.
        
    Returns:
        dict: словарь со списками, необходимыми для диаграммы.
    """
    
    sources = []
    targets = []
    values = []
    labels = []
    link_color = []
    link_text = []

    # проходим по каждому шагу
    for step in tqdm(sorted(df_comp['step'].unique()), desc='Шаг'):
        if step + 1 not in source_indexes:
            continue

        # получаем индекс источника
        temp_dict_source = source_indexes[step]['sources_dict']

        # получаем индексы цели
        temp_dict_target = source_indexes[step+1]['sources_dict']

        # проходим по каждой возможной паре, считаем количество таких пар
        for source, index_source in tqdm(temp_dict_source.items()):
            for target, index_target in temp_dict_target.items():
                # делаем срез данных и считаем количество id            
                temp_df = df_comp[(df_comp['step'] == step)&(df_comp['source'] == source)&(df_comp['target'] == target)]
                value = len(temp_df)
                # проверяем минимальный объем потока и добавляем нужные данные
                if value > frac:
                    sources.append(index_source)
                    targets.append(index_target)
                    values.append(value)
                    # делаем поток прозрачным для лучшего отображения
                    link_color.append(colors[source].replace(', 1)', ', 0.2)'))
                    
    labels = []
    colors_labels = []
    for key in source_indexes:
        for name in source_indexes[key]['sources']:
            labels.append(name)
            colors_labels.append(colors[name])
            
    # посчитаем проценты всех потоков
    perc_values = percent_users(sources, targets, values)
    
    # добавим значения процентов для howertext
    link_text = []
    for perc in perc_values:
        link_text.append(f"{perc}%")
    
    # возвратим словарь с вложенными списками
    return {'sources': sources, 
            'targets': targets, 
            'values': values, 
            'labels': labels, 
            'colors_labels': colors_labels, 
            'link_color': link_color, 
            'link_text': link_text}
  

# создаем словарь
data_for_plot = lists_for_plot()

Совсем не обязательно генерировать все эти списки — для построения диаграммы в одном цвете достаточно только списков sources, targets, values.

Обратите внимание на аргумент frac функции lists_for_plot. Бывают случаи, когда узлов слишком много и объем потока между узлами может быть мал. Эта переменная ограничивает минимальный поток между узлами (по умолчанию — шаг не менее 10 уникальных id между узлами). Всё что меньше будет отсечено и отображаться не будет.

После подготовки данных приступим к созданию объекта диаграммы. Сохраним его в отдельную переменную senkey_diagram для дальнейшего сохранения или публикации:

Создание объекта диаграммы
def plot_senkey_diagram(data_dict=data_for_plot):    
    
    """
    Функция для генерации объекта диаграммы Сенкей 
    
    Args:
        data_dict (dict): словарь со списками данных для построения.
        
    Returns:
        plotly.graph_objs._figure.Figure: объект изображения.
    """
    
    fig = go.Figure(data=[go.Sankey(
        domain = dict(
          x =  [0,1],
          y =  [0,1]
        ),
        orientation = "h",
        valueformat = ".0f",
        node = dict(
          pad = 50,
          thickness = 15,
          line = dict(color = "black", width = 0.1),
          label = data_dict['labels'],
          color = data_dict['colors_labels']
        ),
        link = dict(
          source = data_dict['sources'],
          target = data_dict['targets'],
          value = data_dict['values'],
          label = data_dict['link_text'],
          color = data_dict['link_color']
      ))])
    fig.update_layout(title_text="Sankey Diagram", font_size=10, width=3000, height=1200)
    
    # возвращаем объект диаграммы
    return fig
  

# сохраняем диаграмму в переменную
senkey_diagram = plot_senkey_diagram()

Чтобы ее отобразить нужно выполнить:

senkey_diagram.show()

Приведу фрагмент полученной диаграммы:

b3cb2f0d0b0b96666cb2456bab2d73d4.gif

Что с этим делать?

Сохранение в html

Диаграмма, в зависимости от количества выбранных шагов, может получиться довольно большой. Для удобства анализа можно сохранить ее в html, а затем открыть в любом браузере. Так будет удобнее скроллить.

Сохранение диаграммы в html
senkey_diagram.write_html('demo_senkey.html', auto_open=True)

Задайте любое имя html файлу. При использовании auto_open диаграмма автоматически откроется в браузере по умолчанию.

Публикация в Plotly Chart Studio

Можно опубликовать полученную диаграмму в Plotly Chart Studio для онлайн доступа с любых устройств. Для этого нужно зарегистрировать бесплатный аккаунт. После этого выполнить следующую настройку (более подробные действия описаны здесь):

Предварительная настройка chart_studio
import chart_studio
chart_studio.tools.set_credentials_file(username='YOU_LOGIN', api_key='YOU_API_KEY')

После настройки загрузите вашу диаграмму:

Загрузка диаграммы в chart_studio
py.plot(senkey_diagram, filename = 'NAME_FIG', auto_open=True)

Ссылка, приведённая в начале статьи, сгенерирована именно этим способом.

Заключение

Мы рассмотрели, как пошагово можно создать диаграмму Сэнкей — от загрузки и генерирования необходимых данных до сохранения полученной диаграммы. Надеюсь, что приведенный гайд будет полезен и поможет расширить представление о возможностях визуализации данных с помощью Python и библиотеки Plotly.
Спасибо за внимание!

© Habrahabr.ru