Redis на практических примерах
Redis — достаточно популярный инструмент, который из коробки поддерживает большое количество различных типов данных и методов работы с ними. Во многих проектах он используется в качестве кэшируещего слоя, но его возможности намного шире. Мы в ManyChat очень любим Redis и активно используем его в нашем продукте для решения огромного количества задач. Про некоторые интересные кейсы использования этой in-memory key-value базы данных я расскажу на примерах. Надеюсь, вам они будут полезны, и вы сможете применить что-то в своих проектах.
Рассмотрим следующие кейсы:
- Кэширование данных (да, банально и скучно, но это классный инструмент для кэширования и обойти стороной этот кейс, кажется будет не правильно)
- Работа с очередями на базе redis
- Организация блокировок (mutex)
- Делаем систему rate-limit
- Pubsub — делаем рассылки сообщений на клиенты
Буду работать с сырыми redis командами, чтобы не завязываться на какую-либо конкретную библиотеку, предоставляющую обертку над этими командами. Код буду писать на PHP с использованием ext-redis, но он здесь для наглядности, использовать представленные подходы можно в связке с любым другим языком программирования.
Кэширование данных
Давайте начнем с самого простого, один из самых популярных кейсов использования Redis — кэширование данных. Будет полезно для тех, кто не работал с Redis. Для тех, кто уже давно пользуется этим инструментом — можно смело переходить к следующему кейсу. Для того, чтобы снизить нагрузку на БД, иметь возможность запрашивать часто используемые данные максимально быстро, используется кэш. Redis — это in-memory хранилище, то есть данные хранятся в оперативной памяти. Ещё это key-value хранилище, где доступ к данным по их ключу имеет сложность O (1) — поэтому данные мы получаем очень быстро.
Получение данных из хранилища выглядит следующим образом:
public function getValueFromCache(string $key)
{
return $this->getRedis()->rawCommand('GET', $key);
}
Но для того, чтобы данные из кэша получить, их нужно сначала туда положить. Простой пример записи:
public function setValueToCache(string $key, $value)
{
$this->getRedis()->rawCommand('SET', $key, $value);
}
Таким образом, мы запишем данные в Redis и сможем их считать по тому же самому ключу в любой нужный нам момент. Но если мы будем все время писать в Redis, данные в нем будут занимать все больше и больше места в оперативной памяти. Нам нужно удалять нерелевантные данные, контролировать это вручную достаточно проблематично, поэтому пускай redis занимается этим самостоятельно. Добавим к нашему ключу TTL (время жизни ключа):
public function setValueToCache(string $key, $value, int $ttl = 3600)
{
$this->getRedis()->rawCommand('SET', $key, $value, 'EX', $ttl);
}
По истечении времени ttl (в секундах) данные по этому ключу будут автоматически удалены.
Как говорят, в программировании существует две самых сложных вещи: придумывание названий переменных и инвалидация кэша. Для того, чтобы принудительно удалить значение из Redis по ключу, достаточно выполнить следующую команду:
public function dropValueFromCache(string $key)
{
$this->getRedis()->rawCommand('DEL', $key);
}
Также редис позволяет получить массив значений по списку ключей:
public function getValuesFromCache(array $keys)
{
return $this->getRedis()->rawCommand('MGET', ...$keys);
}
И соответственно массовое удаление данных по массиву ключей:
public function dropValuesFromCache(array $keys)
{
$this->getRedis()->rawCommand('MDEL', ...$keys);
}
Очереди
Используя имеющиеся в Redis структуры данных, мы можем запросто реализовать стандартные очереди FIFO или LIFO. Для этого используем структуру List и методы по работе с ней. Работа с очередями состоит из двух основных действий: отправить задачу в очередь, и взять задачу из очереди. Отправлять задачи в очередь мы можем из любой части системы. Получением задачи из очереди и ее обработкой обычно занимается выделенный процесс, который называется консьюмером (consumer).
Итак, для того, чтобы отправить нашу задачу в очередь, нам достаточно использовать следующий метод:
public function pushToQueue(string $queueName, $payload)
{
$this->getRedis()->rawCommand('RPUSH', $queueName, serialize($payload));
}
Тем самым мы добавим в конец листа с названием $queueName некий $payload, который может представлять из себя JSON для инициализации нужной нам бизнес логики (например данные по денежной транзакции, данные для инициализации отправки письма пользователю, etc.). Если же в нашем хранилище не существует листа с именем $queueName, он будет автоматически создан, и туда попадет первый элемент $payload.
Со стороны консьюмера нам необходимо обеспечить получение задач из очереди, это реализуется простой командой чтения из листа. Для реализации FIFO очереди мы используем чтение с обратной записи стороны (в нашем случае мы писали через RPUSH), то есть читать будем через LPOP:
public function popFromQueue(string $queueName)
{
return $this->getRedis()->rawCommand('LPOP', $queueName);
}
Для реализации LIFO очереди, нам нужно будет читать лист с той же стороны, с которой мы в него пишем, то есть через RPOP.
Тем самым мы вычитываем по одному сообщению из очереди. В случае если листа не существует (он пустой), то мы получим NULL. Каркас консьюмера мог бы выглядеть так:
class Consumer {
private string $queueName;
public function __construct(string $queueName)
{
$this->queueName = $queueName;
}
public function run()
{
while (true) { //Вычитываем в бесконечном цикле нашу очередь
$payload = $this->popFromQueue();
if ($payload === null) { //Если мы получили NULL, значит очередь пустая, сделаем небольшую паузу в ожидании новых сообщений
sleep(1);
continue;
}
//Если очередь не пустая и мы получили $payload, то запускаем обработку этого $payload
$this->process($payload);
}
}
private function popFromQueue()
{
return $this->getRedis()->rawCommand('LPOP', $this->queueName);
}
}
Для того, чтобы получить информацию о глубине очереди (сколько значений хранится в нашем листе), можем воспользоваться следующей командой:
public function getQueueLength(string $queueName)
{
return $this->getRedis()->rawCommand('LLEN', $queueName);
}
Мы рассмотрели базовую реализацию простых очередей, но Redis позволяет строить более сложные очереди. Например, мы хотим знать о времени последней активности наших пользователей на сайте. Нам не важно знать это с точностью вплоть до секунды, приемлемая погрешность — 3 минуты. Мы можем обновлять поле last_visit пользователя при каждом запросе на наш бэкенд от этого пользователя. Но если этих пользователей большое количество в онлайне — 10,000 или 100,000? А если у нас еще и SPA, которое отправляет много асинхронных запросов? Если на каждый такой запрос обновлять поле в бд, мы получим большое количество тупых запросов к нашей БД. Эту задачу можно решать разными способами, один из вариантов — это сделать некую отложенную очередь, в рамках которой мы будем схлопывать одинаковые задачи в одну в определенном промежутке времени. Здесь на помощь нам придет такая структура, как Sorted SET. Это взвешенное множество, каждый элемент которого имеет свой вес (score). А что если в качестве score мы будем использовать timestamp добавления элемента в этот sorted set? Тогда мы сможем организовать очередь, в которой можно будет откладывать некоторые события на определенное время. Для этого используем следующую функцию:
public function pushToDelayedQueue(string $queueName, $payload, int $delay = 180)
{
$this->getRedis()->rawCommand('ZADD', $queueName, 'NX', time() + $delay, serialize($payload))
}
В такой схеме идентификатор пользователя, зашедшего на сайт, попадет в очередь $queueName и будет висеть там в течение 180 секунд. Все другие запросы в рамках этого времени будут также отправляться в эту очередь, но они не будут туда добавлены, так как идентификатор этого пользователя уже существует в этой очереди и продублирован он не будет (за это отвечает параметр 'NX'). Так мы отсекаем всю лишнюю нагрузку и каждый пользователь будет генерить не более одного запроса в 3 минуты на обновление поля last_visit.
Теперь возникает вопрос о том, как читать эту очередь. Если методы LPOP и RPOP для листа читают значение и удаляют его из листа атомарно (это значит, что одно и тоже значение не может быть взято несколькими консьюмерами), то sorted set такого метода из коробки не имеет. Мы можем сделать чтение и удаление элемента только двумя последовательными командами. Но мы можем выполнить эти команды атомарно, используя простой LUA скрипт!
public function popFromDelayedQueue(string $queueName)
{
$command = 'eval "
local val = redis.call(\'ZRANGEBYSCORE\', KEYS[1], 0, ARGV[1], \'LIMIT\', 0, 1)[1]
if val then
redis.call(\'ZREM\', KEYS[1], val)
end
return val"
';
return $this->getRedis()->rawCommand($command, 1, $queueName, time());
}
В этом LUA скрипте мы пытаемся получить первое значение с весом в диапазоне от 0 до текущего timestamp в переменную val с помощью команды ZRANGEBYSCORE, если нам удалось получить это значение, то удаляем его из sorted set командой ZREM и возвращаем само значение val. Все эти операции выполняются атомарно. Таким образом мы можем вычитывать нашу очередь в консьюмере, аналогично с примером очереди построенной на структуре LIST.
Я рассказал про несколько базовых паттернов очередей, реализованных в нашей системе. На текущий момент у нас в продакшене существуют более сложные механизмы построения очередей — линейных, составных, шардированных. При этом Redis позволяет все это делать при помощи смекалки и готовых круто работающих структур из коробки, без сложного программирования.
Блокировки (Mutex)
Mutex (блокировка) — это механизм синхронизации доступа к shared ресурсу нескольких процессов, тем самым гарантируя, что только один процесс будет взаимодействовать с этим ресурсом в единицу времени. Этот механизм часто применяется в биллинге и других системах, где важно соблюдать потоковую безопасность (thread safety).
Для реализации mutex на базе Redis прекрасно подойдет стандартный метод SET с дополнительными параметрами:
public function lock(string $key, string $hash, int $ttl = 10): bool
{
return (bool)$this->getRedis()->rawCommand('SET', $key, $hash, 'NX', 'EX', $ttl);
}
где параметрами для установки mutex являются:
- $key — ключ идентифицирующий mutex;
- $hash — генерируем некую подпись, которая идентифицирует того, кто поставил mutex. Мы же не хотим, чтобы кто-то в другом месте случайно снял блокировку и вся наша логика рассыпалась.
- $ttl — время в секундах, которое мы отводим на блокировку (на тот случай, если что-то пойдет не так, например процесс, поставивший блокировку, по какой-то причине умер и не снял ее, чтобы это блокировка не висела бесконечно).
Основное отличие от метода SET, используемого в механизме кэширования — это параметр NX, который говорит Redis о том, что значение, которое уже хранится в Redis по ключу $key, не будет записано повторно. В результате, если в Redis нет значения по ключу $key, туда произведется запись и в ответе мы получим 'OK', если значение по ключу уже есть в Redis, оно не будет туда добавлено (обновлено) и в ответе мы получим NULL. Результат метода lock (): bool, где true
— блокировка поставлена, false
— уже есть активная блокировка, создать новую невозможно.
Чаще всего, когда мы пишем код, который пытается работать с shared ресурсом, который заблокирован, мы хотим дождаться его разблокировки и продолжить работу с этим ресурсом. Для этого можем реализовать простой метод для ожидания освободившегося ресурса:
public function tryLock(string $key, string $hash, int $timeout, int $ttl = 10): bool
{
$startTime = microtime(true);
while (!this->lock($key, $hash, $ttl)) {
if ((microtime(true) - $startTime) > $timeout) {
return false; // не удалось взять shared ресурс под блокировку за указанный $timeout
}
usleep(500 * 1000) //ждем 500 миллисекунд до следующей попытки поставить блокировку
}
return true; //блокировка успешно поставлена
}
Мы разобрались как ставить блокировку, теперь нам нужно научиться ее снимать. Для того, чтобы гарантировать снятие блокировки тем процессом, который ее установил, нам понадобится перед удалением значения из хранилища Redis, сверить хранимый хэш по этому ключу. Для того, чтобы сделать это атомарно, воспользуемся LUA скриптом:
public function releaseLock(string $key, string $hash): bool
{
$command = 'eval "
if redis.call("GET",KEYS[1])==ARGV[1] then
return redis.call("DEL",KEYS[1])
else
return 0
end"
';
return (bool) $this->getRedis()->rawCommand($command, 1, $key, $hash);
}
Здесь мы пытаемся найти с помощью команды GET значение по ключу $key, если оно равно значению $hash, то удаляем его при помощи команды DEL, которая вернет нам количество удаленных ключей, если же значения по ключу $key не существует, или оно не равно значению $hash, то мы возвращаем 0, что значит блокировку снять не удалось. Базовый пример использования mutex:
class Billing {
public function charge(int $userId, int $amount)
{
$mutexName = sprintf('billing_%d', $userId);
$hash = sha1(sprintf('billing_%d_%d'), $userId, mt_rand()); //генерим некий хэш запущенного потока
if (!$this->tryLock($mutexName, $hash, 10)) { //пытаемся поставить блокировку в течение 10 секунд
throw new Exception('Не получилось поставить lock, shared ресурс занят');
}
//lock получен, процессим бизнес-логику
$this->doSomeLogick();
//освобождаем shared ресурс, снимаем блокировку
$this->releaseLock($mutexName, $hash);
}
}
Rate limiter
Достаточно частая задача, когда мы хотим ограничить количество запросов к нашему апи. Например на один API endpoint от одного аккаунта мы хотим принимать не более 100 запросов в минуту. Эта задача легко решается с помощью нашего любимого Redis:
public function isLimitReached(string $method, int $userId, int $limit): bool
{
$currentTime = time();
$timeWindow = $currentTime - ($currentTime % 60); //Так как наш rate limit имеет ограничение 100 запросов в минуту,
//то округляем текущий timestamp до начала минуты — это будет частью нашего ключа, //по которому мы будем считать количество запросов
$key = sprintf('api_%s_%d_%d', $method, $userId, $timeWindow); //генерируем ключ для счетчика, соответственно каждую минуту он будет меняться исходя из $timeWindow
$count = $this->getRedis()->rawCommand('INCR', $key); //метод INCR увеличивает значение по указанному ключу, и возвращает новое значение.
//Если ключа не существует, он будут инициализирован со значением 0 и после этого увеличен
if ($count > $limit) { //limit достигнут
return true;
}
return false;
}
Таким простым методом мы можем лимитировать количество запросов к нашему API, базовый каркас нашего контроллера мог бы выглядеть следующим образом:
class FooController {
public function actionBar()
{
if ($this->isLimitReached(__METHOD__, $this->getUserId(), 100)) {
throw new Exception('API method max limit reached');
}
$this->doSomeLogick();
}
}
Pub/sub
Pub/sub — интересный механизм, который позволяет, с одной стороны, подписаться на канал и получать сообщения из него, с другой стороны — отправлять в этот канал сообщение, которое будет получено всеми подписчиками. Наверное у многих, кто работал с вебсокетами, возникла аналогия с этим механизмом, они действительно очень похожи. Механизм pub/sub не гарантирует доставки сообщений, он не гарантирует консистентности, поэтому не стоит его использовать в системах, для которых важны эти критерии. Однако рассмотрим этот механизм на практическом примере. Предположим, что у нас есть большое количество демонизированных команд, которыми мы хотим централизованно управлять. При инициализации нашей команды мы подписываемся на канал, через который будем получать сообщения с инструкциями. С другой стороны у нас есть управляющий скрипт, который отправляет сообщения с инструкциям в указанный канал. К сожалению, стандартный PHP работает в одном блокирующем потоке; для того, чтобы реализовать задуманное, используем ReactPHP и реализованный под него клиент Redis.
Подписка на канал:
class FooDaemon {
private $throttleParam = 10;
public function run()
{
$loop = React\EventLoop\Factory::create(); //инициализируем event-loop ReactPHP
$redisClient = $this->getRedis($loop); //инициализируем клиента Redis для ReactPHP
$redisClient->subscribe(__CLASS__); // подписываемся на нужный нам канал в Redis, в нашем примере название канала соответствует названию класса
$redisClient->on('message', static function($channel, $payload) { //слушаем события message, при возникновении такого события, получаем channel и payload
switch (true) { // Здесь может быть любая логика обработки сообщений, в качестве примера пускай будет так:
case \is_int($payload): //Если к нам пришло число – обновим параметр $throttleParam на полученное значение
$this->throttleParam = $payload;
break;
case $payload === 'exit': //Если к нам пришла команда 'exit' – завершим выполнение скрипта
exit;
default: //Если пришло что-то другое, то просто залогируем это
$this->log($payload);
break;
}
});
$loop->addPeriodicTimer(0, function() {
$this->doSomeLogick(); // Здесь в бесконечном цикле может выполняться какая-то логика, например чтение задач из очереди и их процессинг
});
$loop->run(); //Запускаем наш event-loop
}
}
Отправка сообщения в канал — более простое действие, мы можем сделать это абсолютно из любого места системы одной командой:
public function publishMessage($channel, $message)
{
$this->getRedis()->publish($channel, $message);
}
В результате такой отправки сообщения в канал, все клиенты, которые подписаны на данный канал, получат это сообщение.
Итог
Мы рассмотрели 5 примеров использования Redis на практике, надеюсь что каждый найдет для себя что-то интересное. В нашем стэке технологий Redis занимает важное место, мы любим этот инструмент за его скорость и гибкость. Мы используем Redis в продакшене уже много лет, и он зарекомендовал себя как очень крутой и надежный инструмент, который лежит в основе многих частей нашего продукта. Наш небольшой кластер Redis серверов обрабатывает около 1 миллиона запросов в секунду. А как вы используете Redis в своем проекте? Делитесь опытом в комментариях!