Обработка временных рядов в TimescaleDB с интеграцией pandas и NumPy
Привет, Хабр!
Если вы когда‑либо занимались анализом данных, связанных со временем, то наверняка знаете, каким это иногда бывает нелегким занятием — особенно когда данных много, миллионы строк, и SQL начинает медленно кряхтеть под нагрузкой. Но для этого есть отличный инструмент: TimescaleDB на базе PostgreSQL.
PostgreSQL в принципе хорош, но когда речь заходит о временной шкале, хочется больше гибкости. TimescaleDB — это расширение к PostgreSQL, умеющее превращать обычные таблицы в «гипертаблицы», которые автоматически шардируются по времени (и по пространству, если надо). Это означает:
Оптимизация на уровне хранения: данные автоматически разбиваются на чанки по временным интервалам.
Специальные функции для временной агрегации.
Continuous aggregates — материализованные представления, которые сами обновляются.
Простая интеграция с экосистемой Postgres: SQL, транзакции, индексы.
Собственно, самое важное: все это можно писать на Python.
Итак, сегодняшний сетап: PostgreSQL + TimescaleDB + Python (pandas, NumPy). Предположим, есть поток временных данных — цены акций, метрики датчиков, логирование запросов, да что угодно. Мы хотим:
Завести все в TimescaleDB.
Быстро делать агрегации: среднее за 5 минут, 10 минут, по суткам.
Тянуть данные в Python для сложных анализов, применять NumPy‑вычисления, pandas‑агрегации.
При необходимости возвращать обработанные результаты обратно в базу.
Установка TimescaleDB
Обычно уже есть Postgres, поэтому достаточно поставить расширение TimescaleDB. Например, на Ubuntu:
sudo apt update
sudo apt install timescaledb-postgresql-15
После этого правим postgresql.conf
или исполняем ALTER SYSTEM SET shared_preload_libraries = 'timescaledb';
, а затем рестартим Postgres. Дальше в базе пишем:
CREATE EXTENSION IF NOT EXISTS timescaledb;
TimescaleDB готов.
Создание гипертаблицы и загрузка данных
Гипертаблица — это главная фича TimescaleDB: мы берем обычную таблицу, вызываем функцию create_hypertable()
, и она автоматом шардирует данные по времени (и по ключу, если надо). Благодаря этому запросы по временным диапазонам работают шустрее.
Предположим, есть таблица для цен акций:
CREATE TABLE stock_prices (
time TIMESTAMP WITH TIME ZONE NOT NULL,
symbol TEXT NOT NULL,
price DOUBLE PRECISION NOT NULL
);
Теперь делаем из нее гипертаблицу:
SELECT create_hypertable('stock_prices', 'time');
Теперь эта таблица стала гипертаблицей, которая хранит данные шардами по времени.
Импорт данных через Python
Представим, что есть CSV с историческими данными. Посмотрим, как это можно перенести в базу Python‑скриптом.
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
# Подключение к базе
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=localhost")
conn.autocommit = True
df = pd.read_csv("historical_prices.csv")
# Ожидаем, что там колонки: time, symbol, price
# Пример: 2021-01-01T00:00:00Z, AAPL, 132.05
# Превращаем в список кортежей для быстрой вставки
records = df.to_records(index=False)
values = [tuple(record) for record in records]
with conn.cursor() as cur:
insert_query = """
INSERT INTO stock_prices (time, symbol, price)
VALUES %s
ON CONFLICT DO NOTHING;
"""
execute_values(cur, insert_query, values)
Так мы залили данные. Да, для больших объемов лучше использовать COPY
или их утилиты, но execute_values
тоже неплох, особенно для небольшой пачки данных.
time_bucket и непрерывные агрегаты
Когда у нас в базе уже лежат миллионы строк с временными рядами, возникает вопрос: «Как агрегировать и анализировать эти массивы данных?» TimescaleDB имеет мощные инструменты для этого, в частности функцию time_bucket()
и непрерывные агрегаты.
Функция time_bucket(interval, timestamp)
разбивает временную ось на равные интервалы. Например, если нужно сгруппировать цены по 10-минутным «корзинам» (бакетам) и получить среднюю цену для символа AAPL
за последние 24 часа, то простой SQL‑запрос будет выглядеть так:
SELECT
time_bucket('10 minutes', time) AS bucket,
AVG(price) AS avg_price
FROM stock_prices
WHERE symbol = 'AAPL'
AND time > NOW() - INTERVAL '1 day'
GROUP BY bucket
ORDER BY bucket;
Берем все точки данных по AAPL
за последние сутки, затем time_bucket
равномерно нарезает этот период на кусочки по 10 минут, а AVG(price)
дает среднее значение цены за каждый такой период. Результат — легко читаемая для анализа временная шкала, где каждый интервал имеет свое агрегированное значение.
Благодаря архитектуре TimescaleDB такие запросы работают очень шустро, особенно при наличии индексов по времени. Чтобы повысить производительность еще больше, стоит создавать индексы по (symbol, time)
или (time, symbol)
:
CREATE INDEX ON stock_prices(symbol, time DESC);
Теперь фильтры по символу и времени будут использовать этот индекс и работать быстрее.
Зачастую вы будете обращаться к агрегированным данным не один раз, а постоянно. Почему бы не сохранить результат заранее и обновлять его по мере поступления новых данных? Для этого подходят непрерывные агрегаты. Они позволяют создавать материализованное представление, которое TimescaleDB будет аккуратно рефрешить по мере необходимости.
Скажем, нужно непрерывно иметь под рукой среднюю 10-минутную цену для всех символов. Создадим непрерывный агрегат:
CREATE MATERIALIZED VIEW stock_prices_avg_10min
WITH (timescaledb.continuous) AS
SELECT time_bucket('10 minutes', time) AS bucket,
symbol,
AVG(price) AS avg_price
FROM stock_prices
GROUP BY bucket, symbol
WITH NO DATA;
Основные моменты тут:
WITH (timescaledb.continuous)
указывает на то, что этот материализованный вид — непрерывный агрегат, а не обычный.WITH NO DATA
значит, что мы пока не заполняем его историческими данными. Мы можем это сделать позже.
Чтобы обновить агрегат на актуальные данные (как исторические, так и вновь поступившие):
CALL refresh_continuous_aggregate('stock_prices_avg_10min', NULL, NULL);
Функция refresh_continuous_aggregate
пройдется по новым или измененным данным в исходной таблице stock_prices
и аккуратно внесёт изменения в материализованный вид. Таким образом, любые SELECT‑запросы к stock_prices_avg_10min
будут выполняться гораздо быстрее — фактически вы запрашиваете уже агрегированные и оптимизированные под чтение данные.
Прочие детали:
Можно настроить автоматическое обновление непрерывных агрегатов при помощи встроенных политик TimescaleDB или периодически дергать
CALL refresh_continuous_aggregate()
по расписанию (например, cron или встроенные background job«ы TimescaleDB).Никто не мешает использовать более мелкие или более крупные интервалы.
10 minutes
— это просто пример. Можно делатьtime_bucket('1 hour', time)
илиtime_bucket('5 seconds', time)
, в зависимости от частоты поступления данных и нужд бизнеса.Непрерывные агрегаты можно каскадировать. К примеру, сначала вы создаёте агрегат по 1-минутным интервалам, затем поверх него — агрегат по 10-минутным интервалам.
Сжатие и хранение: Дальнейшее уплотнение исторических данных (через встроенное сжатие) в сочетании с непрерывными агрегатами может драматически снизить хранение и ускорить аналитические запросы.
Переходим к аналитике с pandas и NumPy
Вот у нас в TimescaleDB полно данных. Скорее всего, хочется их как‑то хитро анализировать. В Python есть отличный инструментарий. Как вытащить данные в pandas DataFrame? Да легко:
import pandas as pd
import psycopg2
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=localhost")
query = """
SELECT time, symbol, price
FROM stock_prices
WHERE symbol = 'AAPL'
AND time > NOW() - INTERVAL '7 days'
ORDER BY time;
"""
df = pd.read_sql(query, conn)
Получили DataFrame с колонками: time
, symbol
, price
. Допустим, нужно посчитать скользящее среднее цены за 10 периодов:
df.set_index('time', inplace=True)
df['rolling_mean'] = df['price'].rolling('10T').mean() # '10T' значит 10 минутное окно
А теперь представим, что нам нужны какие‑то хитрые NumPy‑приёмы, например, быстрое вычисление стандартного отклонения или корреляции между разными символами. Для этого мы можем выкачать несколько символов сразу:
symbols = ['AAPL', 'GOOG', 'MSFT']
query_multiple = f"""
SELECT time, symbol, price
FROM stock_prices
WHERE symbol = ANY('{{{",".join(symbols)}}}')
AND time > NOW() - INTERVAL '7 days'
ORDER BY time;
"""
df_multi = pd.read_sql(query_multiple, conn)
# Превращаем в pivot-таблицу, где индекс - время, колонки - символы, значения - цена
df_pivot = df_multi.pivot(index='time', columns='symbol', values='price')
Теперь у нас панорамный взгляд на несколько акций. С NumPy считаем корреляцию:
import numpy as np
corr_matrix = np.corrcoef(df_pivot.dropna().values.T) # Считаем корреляцию между рядами
print(corr_matrix)
Мы получили корреляционную матрицу между временными рядами акций. Можно подключить matplotlib и построить графики, но это уже другая история.
NumPy для более сложных вычислений
Представим, что нужно выполнить какой‑то нестандартный фильтр или преобразование сигнала, например, сгладить шумы или применить FFT. Делается это легко:
import pandas as pd
import psycopg2
conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass host=localhost")
query = """
SELECT time, symbol, price
FROM stock_prices
WHERE symbol = 'AAPL'
AND time > NOW() - INTERVAL '7 days'
ORDER BY time;
"""
df = pd.read_sql(query, conn)
Получаем сглаженный ряд. Все это можно потом запулить обратно в TimescaleDB, если вдруг нужно хранить результаты анализа для дальнейших запросов.
Запись обработанных данных обратно в TimescaleDB
Допустим, нужно сохранить результат аналитики в новую таблицу stock_prices_smoothed
. Если ее нет — создадим:
df.set_index('time', inplace=True)
df['rolling_mean'] = df['price'].rolling('10T').mean() # '10T' значит 10 минутное окно
Дальше в Python:
symbols = ['AAPL', 'GOOG', 'MSFT']
query_multiple = f"""
SELECT time, symbol, price
FROM stock_prices
WHERE symbol = ANY('{{{",".join(symbols)}}}')
AND time > NOW() - INTERVAL '7 days'
ORDER BY time;
"""
df_multi = pd.read_sql(query_multiple, conn)
# Превращаем в pivot-таблицу, где индекс - время, колонки - символы, значения - цена
df_pivot = df_multi.pivot(index='time', columns='symbol', values='price')
Вот и все, у нас есть обратная связь: мы получили данные из TimescaleDB, обработали их по‑своему и вернули обратно уже в сглаженном виде.
Итак, мы разобрали основы работы с TimescaleDB: создание гипертаблиц, использование time_bucket
, настройку непрерывных агрегатов и интеграцию с Python через pandas и NumPy. Это идеальный инструмент для обработки временных рядов, который сохраняет привычный SQL‑стек и легко интегрируется с современными аналитическими библиотеками. Все примеры были лишь началом, а возможности TimescaleDB гораздо шире.
Если вы хотите углубиться, рекомендую заглянуть в официальную документацию TimescaleDB. А если у вас есть свои находки, интересные кейсы или идеи, пишите в комментариях.
Все актуальные методы и инструменты DS и ML можно освоить на онлайн-курсах OTUS: список всех программ можно посмотреть в каталоге.
Также рекомендую посетить открытые уроки по машинному обучению:
23 декабря: «Машинное обучение на службе Data Science». Подробнее
25 декабря: «Технологии NLP сегодня: основные тренды в области и перспективы развития». Подробнее