[Из песочницы] Сказ о том, как мы BigQuery приручали
Задача
На самом деле, задача, о которой хочется рассказать, проста до уныния по своей формулировке: нужно было визуализировать данные по продажам отдела e-commerce малой кровью, т.е., читай, практически даром.
Что под этим понимается? Корзины наших магазинов генерят постоянный поток данных об онлайн-продажах в разных регионах мира со всеми вытекающими: разные валюты, часовые пояса, налоги, типы клиентов, виды номенклатуры, заказов и т.д. На самом деле, то же самое генерит любой интернет-магазин, только, возможно, варианты параметров у заказов немного отличаются.
Требуется, сидя на этом потоке, выдавать некий агрегированный дашборд, на котором бизнес мог бы в режиме онлайн (а точнее, регулярно) смотреть ключевые показатели как текущего момента, так и по длительным периодам. Да еще желательно, чтобы этих дашбордов было несколько с различной степенью детализации и специализации.
Поиск решения
Естественно, начинали с поиска готового решения. Смотрели таких визуализаторов данных как, например, Owox, решения BI от Microsoft, какие-то enterprise решения и т.д.
Все это замечательно подается рекламой. Но:
- недешево
- вещь в себе (не так-то просто управлять механикой обработки данных, а подчас просто приходится полагаться на компетенции дата-сайнтистов компании-разработчика)
К тому времени (2019 год) у меня уже был маленький карманный проектик визуализации данных в Google Datastudio, который уже существовал как минимум год и сводился он к тому, что я вручную выгружал отчет из 1С, заливал его в Google Storage Cloud (не путаем с GoogleDrive), оттуда — в таблицу BigQuery, ну и далее уже в DataStudio делал срезы данных и все остальное.
Подумалось — нельзя ли сделать так же, но в рамках задачи автоматизации отчетов для бизнеса, причем, желательно, без всякой ручной работы.
Стал копаться, и понял, что за тот год, который я усердно делал руками одни и те же манипуляции, вся экосистема обработки данных Гуглом ушла далеко вперед. И Data Studio обновился сильно, и BigQuery оброс гигантским функционалом, и, что самое главное, появилось много людей, которые точно так же решали проблемы автоматической визуализации отчетности.
В общем, прикинули, что хорошо бы научиться лить данные напрямую из нашей SQL в BigQuery, а дальше уже издеваться над ними, как нам будет угодно.
Создали проект в BigQuery (https://console.cloud.google.com/bigquery), получили бонусные на счет, и поехали.
Кстати, цена вопроса: за последние несколько месяцев работы мы все еще не израсходовали бонусные 300 долларов. При том, что каждый час происходит как минимум 2 транзакции данных в таблицы BigQuery, а также в каждые сутки прокачивается полный отчет по продуктам и заказам, включающий порядка миллиона сущностей.
Приручение
Дальше дело было так. Для начала надо было понять, какие вообще данные можно туда лить, поскольку BigQuery имеет хранилища только в Европе и США, а значит, персональные данные там хранить категорически невозможно. Фокус в том, что для интегральной отчетности это и не требуется!
Стало быть, дело сводится к правильным SELECT, выполненным на локальном сервере, и дальнейшей синхронизации таблиц в BQ.
А надо заметить, что в BQ есть инструмент прямого доступа во внешнюю БД, но с рядом ограничений. Поскольку на наш сервак с основной и резервной БД снаружи ломиться категорически запрещено, а кроме того, требовалась дополнительная обработка данных на лету, этот метод не годился.
В итоге имеем цепочку: SQL ==> выделенный внутренний сервер с Python ==> BigQuery.
Как вы понимаете, самое главное тут — сервак с питоном.
Да, поначалу был еще кривой план с заливкой результатов SELECT в csv-файлы на GoogleStorage с дальнейшим их чтением в BQ либо напрямую, либо с помощью инструмента Google Functions, который позволяет писать небольшие функции на Python (или некоторых других языках) и исполнять их под кроном. Проблема в том, что они могут выполняться не более 6 минут (по крайней мере в бесплатной версии) в каждый запуск.
Но зачем так делать, если можно пойти прямым путем?
Импорт
Итак, предположим, что у нас уже есть функция, которая подключается к зеркалу боевой SQL и сливает там нужный срез таблицы с данными. Для определенности будем считать, что это таблица под названием Orders, содержащая такие атрибуты, как дата создания заказа, дата оплаты, сумма в валюте, налог в валюте, курс валюты идентификатор валюты, клиент id, id магазина и т.д. В общем, то, что нужно для анализа оборотов и построения красивых графиков.
Чтобы не писать много кода, скажу, что скрипт подключается к БД и результат SELECT пишет в pandas dataframe. Выглядит это примерно так:
import pandas as pd
import MySQLdb
import MySQLdb.cursors
def sql_query(date0, date1):
connection = MySQLdb.connect(host=”server”, user=”user”, passwd=”pass”, db=”orders”, charset=”cp1251”, cursorclass = MySQLdb.cursors.SSCursor)
query = “SELECT * FROM orders WHERE date>=’”+date0+”’ and date<’”+date1+”’ ”
data = pd.read_sql_query(query, connection=connection)
# Здесь query - это запрос SELECT к вашей БД, который включает ограничивающие параметры date0 и date1, чтобы отсечь данные, скажем, только за 5 дней. Мы же не собираемся гонять гигабайты данных каждый час - это накладно по времени и по деньгам. Здесь же можно зашить исключение персональных данных и вообще всего, что не требуется тащить в BQ
# Ну, конечно, определение connection вообще лучше вынести в отдельную функцию, которая будет лежать в секурном каталоге и т.п. В вопросы безопасности мы сейчас не полезем.
# Далее мы можем сделать что-то с датафреймом. Например, я преобразовывал даты в нужный формат следующим споосбом:
data.payment_date = data.payment_date.apply(lambda x: x.tz_localize(‘Europe/Moscow’)).astype(str)
# Почему в конце тип str - отдельный вопрос. Практика показала, что лучше передать дату в тексте, если хочешь сохранить информацию о часовых поясах. А уже потом внутри BQ преобразовывать это поле в требуемый формат даты и времени.
# Вообще, лучше типы данных столбцов pandas задать перед экспортом в BQ, чтобы не возникло путаницы при автоматическом выборе типа в момент заливки, либо же явно задать схему таблицы.
return data
Выяснилось также, что на локальном серваке маловато памяти для передачи даже 5-дневного среза, поэтому пришлось нужный интервал дат (90 дней, т.к. изменения в заказы могут вноситься в течение квартала) итерировать циклом, и в цикле вызывать sql_query(date) со сдвигом дат, и сразу же отправлять в BQ.
Итак, мы забираем данные, делаем небольшую предобработку и отдаем pandas dataframe дальше.
Экспорт
Основная идея: залить временную таблицу с текущим блоком данных, а затем силами самого BQ при помощи запроса MERGE объединить временную таблицу с основной. В BQ такой запрос на таблицах с миллионами строк и десятками столбцов отрабатывает за 1-2 минуты. У нас базовая таблица содержит порядка 20 млн записей и около 70 столбцов, преимущественно типа STRING и FLOAT. А временная таблица содержит порядка 70000 записей.
Таким образом, у нас получается одна транзакция и один запрос в каждой итерации цикла по дням. А это очень мало для BQ, и для нас очень дешево.
from google.cloud import bigquery
from google.oauth2 import service_account
project_id = “MyBQproject” # идентификатор проекта в BQ
dataset_id = “MyDataSet” # название датасета в данном проекте
# далее нам нужно подцепить авторизацию в BQ. Лучше всего это сделать с помощью файла-ключа (опять же, лежащего в секурном месте). Сам файл можно создать непосредственно в BQ. Чтобы найти, где это делается, надо выполнить поиск ‘Service accounts’ в Google Cloud Platform.
credentials = service_account.Credentials.from_service_account_file(“acc.json”)
client = bigquery.Client(project_id, credentials=credentials)
def upload_temp_table(df, table_id, schema):
job_config = bigquery.LoadJobConfig()
job_config.schema = schema
# job_config.autodetect = True # если не нужно задавать схему
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # перезаписать таблицу
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # подождем результата
def merge_tables(temp_table, base_table, schema, head, merge_param):
merge_query = “MERGE into “+base_table+” as T USING “+temp_table+” as S \
ON T.”+merge_param+” = S.”+merge_param+ \
“ WHEN MATCHED then UPDATE SET “+ \
‘,’.join([‘T.’+item+’=S.’+item for item in head])+ \
“ WHEN NOT MATCHED then INSERT \
(“+’,’.join(head)+”) VALUES (“+’,’.join([‘S.’+item for item in head])+”);”
# То есть, мержим основную таблицу данных с временной по ключу merge_param (например, по id заказа, если у нас заказу соответствует одна строка таблицы)
# Схему все-таки лучше задавать
job_config = bigquery.LoadJobConfig()
job_config.schema = schema
job = client.query(merge_query, job_config=job_config)
job.result()
Ну, и теперь основной код скрипта — это цикл по датам с запуском трех функций.
import datetime
for period in range(18):
date0 = (datetime.datetime.now() - datetime.timedelta(days = period*5 + 5)).strftime(‘%Y%m%d’)
date1 = (datetime.datetime.now() - datetime.timedelta(days = period*5)).strftime(‘%Y%m%d’)
df = sql_query(date0,date1) # забираем данные из sql за период >=date0, <date1
upload_temp_table(df=df, table_id=”temp_table”, schema) # загружаем во временную таблицу
merge_tables(temp_table=’temp_table’, base_table = ‘orders’, schema=schema, head=[,,,], merge_param=’id’)
Здесь head — это заголовок используемой таблицы. Его лучше определить явно. Это поможет создать правильный запрос в SQL на начальном этапе, при импорте данных. И это поможет корректно создать MERGE-запрос с перечислением всех столбцов. Про формирование схемы для BQ я не буду вдаваться в подробности. Если у вас данные более-менее постоянные, и Python сможем сам разобраться с типом столбцов, то можно не заморачиваться раписыванием схемы таблицы для BQ.
Рекомендации
Логируйте выполнение скриптов. Для этого можно сделать следующее
import sys
log_file = open(“log.txt”, ‘a’)
old_stdout = sys.stdout
sys.stdout = log_file
Далее ваш код, где есть функции print
И в конце скрипта:
sys.stdout = old_stdout
log_file.close()
В итоге все выводы команды print пойдут в лог-файл. Очень удобно при отладке, да и в будущем при поиске сбоев.
Эпилог
Итак, мы научились прокачивать данные в таблицы BigQuery практически даром, остается подключить эти таблицы в отчет DataStudio (https://datastudio.google.com/) и настороить нужные метрики.
Дополнительно замечу, что хоть DataStudio и позволяет настраивать вычисляемые метрики, лучше этот функционал максимально реализовать средствами BigQuery или еще на этапе подготовки данных в Python. Например, если нужна сумма НДС по заказам, а нам известна только сумма и процент, то сумму НДС лучше вычислять прямо в заливочном скрипте через apply.
С другой стороны, при визуализации данных в DataStudio часто требуется агрегация на лету (например, вычислить средний чек), такие метрики, конечно, нигде, кроме самой Datastudio не определишь.
Далее, в Datastudio при работе с ресурсами можно создавать Blended Tables, которые по сути являются LEFT JOIN из таблиц-ресурсов по ключевым параметрам. Работает это не всегда корректно, и, опять же, лучше джойнить таблицы силами BigQuery. Например, при сборке большой таблицы, где соединяются таблица с заказами и клиентами и таблица с содержимым заказов, я использую SQL-запрос, повешенный в BigQuery под расписание. Я знаю, например, что у меня все данные прокачиваются до 6 утра, значит, в 6 часов могу запускать этот запрос и обновлять тотальную таблицу. На таблицах порядка 20 млн записей при 70 столбцах такие запросы работают 3-4 минуты в BigQuery. Вряд ли на локальном серваке вы получите такой же комфортный результат.
Наконец, есть выбор между запуском скриптов по расписанию или же их отправкой непосредственно из питоновского скрипта. Тут уже дело вкуса, как говорится. Что-то мы запускаем по расписанию в BigQuery, а что-то прямо в конце скрипта Python. Собственно, в приведенном выше примере функция merge_tables — это и есть отправка запроса в BQ и ожидание результата. В данном случае это удобно было делать в Питоне, т.к. намного легче подставлять список полей таблиц — джойном списка через запятую. К тому же, эта операция выполняется в цикле.
Напоследок замечу, что в BigQuery можно также использовать таблицы, заданные в Google Spreadsheets. Обычно это какие-то справочники вроде сопоставления id магазина и его названия. Их можно:
- импортировать скриптом на Python в pandas dataframe и далее джойнить силами скрипта с текущими данными
- импортировать прямо в таблицу в вашем датасете в BQ и далее запускать запрос JOIN непосредственно в BQ (опять-таки, по расписанию или из скрипта).
Короче говоря, мы открыли для себя кучу возможностей и притом практически даром. Если вам не нужно проливать терабайты данных по биллингу какого-то сотового оператора, а нужно лишь работать с таблицами объемом в дестяки и сотни мегабайт, то описанный мною вариант кажется вполне адекватным, надежным и незатратным.
И последнее. Хотелось красивый датастудиевский отчет вывесить на каком-то мониторе в конференц-зале, и чтобы он сам обновлялся. Оказалось, что в хроме есть плагин, который с заданным интервалом времени обновляет страницу с отчетом (грубо говоря, нажимает F5). Он так и называется: Data Studio Auto-Refresh. В итоге: вешаем монитор с подключенным миникомпьютером, открываем в хроме отчет в режиме View, запускаем этот плагин Data Studio Auto-Refresh, и дальше все зависит лишь от того, как часто вы прокачиваете данные SQL=>BQ.
А, ну и еще надо быть готовым к тому, что бизнес будет видеть правду гораздо чаще и подробнее, чем до этого :-)