Автореконнекты к RabbitMQ в Go

92acf18aa6914935f85d9227984099cc.png

Брокер сообщений RabbitMQ уже давно активно используется в микросервисах. Он используется, когда требуется асинхронная обработка сообщений от клиентов или при аналогичном межсервисном взаимодействии.

Практически нет языка, под который не была бы сделана соответствующая клиентская библиотека. Для Go такой библиотекой является github.com/streadway/amqp (далее по тексту библиотека amqp). Она имеет широкий функционал, можно подключаться к RabbitMQ, создавать каналы, настраивать очереди и exchange. Не хватает только самой малости — реконектов. А именно автоматических реконнектов при потери связи.

Поиск в Google показывает, что есть много различных решений. На проекте, где я работаю, мы создали ещё парочку. Но не найденные в сети, не уже созданные не устраивали по ряду причин:

  • раздельное обслуживание консумера и продюсера — под каждого свой коннект, а документация на RabbitMQ настойчиво не рекомендует плодить подключения и вместо этого использовать каналы (каналы в amqp это легковестные соединения поверх TCP-подключения) поверх одного подключения;

  • сложные конструкции пула каналов, а то и вовсе их отсутствие — с точки зрения потокобезопасности, как минимум для консьюмера и продюсера, нужно разделять каналы;

  • отсутствие поддержки backoffPolicy;

  • отсутствие graceful shutdown.

Сформулируем требования к желаемому решению:

  • возможность создать общее подключение для консумера и продюсера;

  • простой и прозрачный пул каналов;

  • поддержка backoffPolicy;

  • автоматический реконнект при потере соединения;

  • поддержка graceful shutdown.

Требования появились, можно приступать к реализаци. Сразу оговорюсь, статья ориентирована на тех, кто уже хорошо знаком с библиотекой amqp и не ставит своей целью перевести документацию.

«Новый велосипед с треугольными колёсами»

Первый пункт из озвученных потребностей самый простой, просто создаём одно подключение и используем его для всех последующих манипуляций.

С пулом каналов тоже было решено пойти по простому пути и создать map с ключом в виде следующего объекта:

type ChannelPoolItemKey struct {
    Queue    string
    Consumer string
    Exchange string
    Key      string
}

Такой ключ можно использовать сразу и для консумеров и паблишеров. Как было сказано выше, каналы между ними не должны пересекаться для повышения потокобезопасности.

Реализовать backoffPolicy тоже не сложно:

for _, timeout := range c.backoffPolicy {
  if connErr := c.connect(ctx); connErr != nil {
    logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
    time.Sleep(timeout)
    continue
  }
  break
}

где backoffPolicy это массив типа time.Duration.

Остаётся самое интересное, реконнект и graceful shutdown. Здесь нам поможет пакет golang.org/x/sync/errgroup. Он специально предназначен для управления группами рутин.

При подключении создаётся TCP-подключение и служебный канал. Последний нужен для создания exchange, очередей и биндинга очереди к exchange. Больше он ни для чего не используется, но такая логика упрощает построение пула каналов. Например, при создании exchange не известен ключ роутинга, а при декларировании очереди не известно, с каким exchange она будет связана.

Публичный метод Connect будет по совместительству контролировать подключение. А приватный метод connect будет создавать само подключение и пул каналов. Ниже приведён код подключения.

func (c *Connection) connect(_ context.Context) error {
	var err error
	if c.conn, err = amqp.Dial(c.dsn); err != nil {
		return errors.Wrap(err, "connect to rabbitMQ")
	}

	if c.serviceChannel, err = c.conn.Channel(); err != nil {
		return errors.Wrap(err, "create service rabbitMQ channel")
	}

	c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)

	return nil
}

// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
	if !c.isClosed {
		if err := c.connect(ctx); err != nil {
			return errors.Wrap(err, "connect")
		}
	}

	c.errorGroup = errorGroup
	c.chanCtx = ctx

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(ctx)
		logger.Info().Msg("starting connection watcher")

		for {
			select {
			case <-ctx.Done():
				logger.Info().Msg("connection watcher stopped")
				return ctx.Err()
			default:
				reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ connection unexpected closed")

					c.mu.Lock()
					for _, timeout := range c.backoffPolicy {
						if connErr := c.connect(ctx); connErr != nil {
							logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
							time.Sleep(timeout)
							continue
						}
						break
					}
					c.mu.Unlock()
				}
			}
		}
	})

	return nil
}

Как видно, основная идея была связана с мьютексом mu, который будет блокировать возможность получить доступ к оригинальному коннекту (из библиотеки amqp). Т.е. если происходит какая-то ошибка, косьюмер и продюсер должны попытаться переподключиться, они наткнуться на блокировку и будут ждать восстановления подключения. Как только блокировка будет снята, они смогут заново полностью инициализироваться.

Не забываем, что на стороне сервера может закрыться не только подключение, но и каналы. Для этого по аналогии с подключением используется метод NotifyClose, который регистрирует слушателя для событий о закрытии канала или подключения. Если канал закрывается, то он удаляется из пула и соотвественно ошибка, которая долетит до продюсера/консьюмера вызовет повторное создание канала.

func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
	c.channelPoolMu.Lock()
	defer c.channelPoolMu.Unlock()
	var err error
	poolKey := ChannelPoolItemKey{
		Exchange: exchange,
		Key:      key,
		Queue:    queue,
		Consumer: consumer,
	}
	ch, ok := c.channelPool[poolKey]
	if !ok {
		ch, err = c.conn.Channel()
		if err != nil {
			return nil, errors.Wrap(err, "create channel")
		}
		c.channelPool[poolKey] = ch
		c.chanWatcher(poolKey)
	}

	return ch, nil
}

func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
	ch := c.channelPool[poolKey]

	c.errorGroup.Go(func() error {
		logger := zerolog.Ctx(c.chanCtx)
		logger.Info().Msg("starting channel watcher")

		for {
			select {
			case <-c.chanCtx.Done():
				logger.Info().Msg("channel watcher stopped")
				return c.chanCtx.Err()
			default:
				reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
				if !ok {
					if c.isClosed {
						return nil
					}
					logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
					c.channelPoolMu.Lock()
					delete(c.channelPool, poolKey)
					c.channelPoolMu.Unlock()
					return nil
				}
			}
		}
	})
}

После создания подключения переходим к его закрытию и отображению состояния:

func (c *Connection) Close(_ context.Context) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	c.isClosed = true

	for _, ch := range c.channelPool {
		if err := ch.Close(); err != nil {
			return errors.Wrap(err, "close rabbitMQ channel")
		}
	}

	if err := c.conn.Close(); err != nil {
		return errors.Wrap(err, "close rabbitMQ connection")
	}

	return nil
}

func (c *Connection) IsClosed() bool {
	return c.isClosed
}

Сам Connection, который реализует всё вышеописанное, представлен ниже.

type Connection struct {
	dsn            string
	backoffPolicy  []time.Duration
	conn           *amqp.Connection
	serviceChannel *amqp.Channel
	mu             sync.RWMutex
	channelPool    map[ChannelPoolItemKey]*amqp.Channel
	channelPoolMu  sync.RWMutex
	isClosed       bool
	errorGroup     *errgroup.Group
	chanCtx        context.Context
}

Конечно не хорошо передавать контекст в структуру. Но это было сделано сознательно, чтобы обёртки над стандартными методами библиотеки amqp были взаимозаменяемы с ними.

Ниже код обёрток над стандартными методами библиотеки amqp:

func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}

func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}

func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}

func (c *Connection) Consume(
	queue, consumer string,
	autoAck, exclusive, noLocal, noWait bool,
	args amqp.Table) (<-chan amqp.Delivery, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool("", "", queue, consumer)
	if err != nil {
		return nil, errors.Wrap(err, "get channel from pool")
	}

	return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}

// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
	c.mu.RLock()
	defer c.mu.RUnlock()

	ch, err := c.GetChannelFromPool(exchange, key, "", "")
	if err != nil {
		return errors.Wrap(err, "get channel from pool")
	}

	return ch.Publish(exchange, key, mandatory, immediate, msg)
}

Consumer

В конструктор консьюмера передаётся созданное подключение, а далее запускается подписка на события из очереди. Подписка запускается в отдельной рутине, если происходит ошибка, текущая рутина закрывается и создаётся новая.

func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	logger := zerolog.Ctx(ctx)
	var msg <-chan amqp.Delivery
	var err error

	for {
		if msg, err = c.connect(ctx); err != nil {
			logger.Err(err).Msg("connect consumer to rabbitMQ")
			time.Sleep(10 * time.Second)
			continue
		}
		break
	}

	logger.Info().Msg("consumer connected")

	for {
		select {
		case <-ctx.Done():
			logger.Info().Msg("connection watcher stopped")
			if err := subscriber.Shutdown(ctx); err != nil {
				logger.Err(err).Msg("shutdown handler")
			}
			return ctx.Err()
		case d, ok := <-msg:
			if ok {
				logger.Debug().Msgf("got new event %+v", string(d.Body))
				if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
					logger.Err(errConsume).Msg("consume message")
				}
				if err := d.Ack(true); err != nil {
					logger.Err(err).Msg("ack")
				}
			} else {
				if c.conn.IsClosed() {
					return nil
				}

				logger.Info().Msg("try to reconnect consumer")
				errorGroup.Go(func() error {
					return c.subscribe(ctx, errorGroup, subscriber)
				})
				return nil
			}
		}
	}
}

// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
	errorGroup.Go(func() error {
		return c.subscribe(ctx, errorGroup, subscriber)
	})

	return nil
}

Обработка полученного сообщения выполняется в методе Consume переданного в консьюмер подписчика, реализующего интерфейс Subscriber.

type Subscriber interface {
	Consume(ctx context.Context, data []byte) error
	Shutdown(ctx context.Context) error
}

В этом интерфейсе также есть метод Shutdown для действий при штатной остановки консьюмера.

В приватном методе connect выполняется создание exchange, очереди, биндиг очереди к exchange и создание канала на прослушивание событий.

func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
	if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return nil, errors.Wrap(err, "declare a exchange")
	}

	if _, err := c.conn.QueueDeclare(
		c.config.RabbitQueue, // name
		true,                 // durable
		false,                // delete when unused
		false,                // exclusive
		false,                // no-wait
		nil,                  // arguments
	); err != nil {
		return nil, errors.Wrap(err, "declare a queue")
	}

	if err := c.conn.QueueBind(
		c.config.RabbitQueue,  // queue name
		c.config.RoutingKey,   // routing key
		c.config.ExchangeName, // exchange
		false,
		nil,
	); err != nil {
		return nil, errors.Wrap(err, "bind to queue")
	}

	msg, err := c.conn.Consume(
		c.config.RabbitQueue,   // queue
		c.config.RabbitConsume, // consume
		false,                  // auto-ack
		false,                  // exclusive
		false,                  // no-local
		false,                  // no-wait
		nil,                    // args
	)
	if err != nil {
		return nil, errors.Wrap(err, "consume message")
	}

	return msg, nil
}

Publisher

Также как и при создании консьюмера в конструктор паблишера передаётся созданное подключение. При первой попытке опубликовать создаётся exchange для публикаций. Если при публикации возникает ошибка, то пытаемся ещё раз. Если вторая попытка не удалась, то тогда возвращаем ошибку вызвавшему методу.

func (p *Publisher) connect(_ context.Context) error {
	p.muConn.Lock()
	defer p.muConn.Unlock()
	if p.isConnected {
		return nil
	}

	if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
		false, false,
		false, nil); err != nil {
		return errors.Wrap(err, "declare a exchange")
	}

	p.isConnected = true

	return nil
}

// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
	logger := zerolog.Ctx(ctx)

	body, err := json.Marshal(message)
	if err != nil {
		return errors.Wrap(err, "marshal message")
	}

	ampqMsg := buildMessage(body)

	logger.Debug().Msgf("send message: %s", string(body))

	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	// We try to send message twice. Between attempts we try to reconnect.
	if err := p.sendMessage(ctx, ampqMsg); err != nil {
		if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
			if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
				return errors.Wrap(errBadMsg, "count bad messages")
			}
			return errors.Wrap(errRetryPub, "retry publish a message")
		}
	}

	if err := p.okMessages(ctx); err != nil {
		return errors.Wrap(err, "count ok messages")
	}

	return nil
}

func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
	logger := zerolog.Ctx(ctx)
	if !p.isConnected {
		if err := p.connect(ctx); err != nil {
			logger.Err(err).Msg("connect publisher to rabbitMQ")
		}
	}

	if err := p.conn.Publish(
		p.config.ExchangeName,
		p.config.RoutingKey,
		false,
		false,
		*ampqMsg,
	); err != nil {
		p.muConn.Lock()
		p.isConnected = false
		p.muConn.Unlock()
		return errors.Wrap(err, "publish a message")
	}
	return nil
}

Методы badMessages и okMessages используеются для подсчёта статистки успеха отправки сообщений. buildMessage небольшой хелпер для подготовки сообщения для отправки.

Заключение

Написанный код ещё плохо покрыт тестами. Планируется использовать тесты с использованием докера, чтобы проверять функционал на реальном RabbitMQ. Но есть тестовый микросервис, который использует данный функционал. При его запуске можно отправить в очередь косьюмера событие, которое будет обработано сервисом и приведёт к отправке сообщения паблишером. При перезапуске RabbitMQ микросервис автоматически переподключается. Остановка тестового микросервиса также выполняется штатно.

Ссылки

Исходники go-garage/providers/rabbitmq at main · soldatov-s/go-garage (github.com)

Пример микросервиса soldatov-s/go-garage-example (github.com)

© Habrahabr.ru