Создание кастомного балансировщика нагрузки на Go для gRPC с приоритизацией адресов

В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.

В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.

Постановка задачи

При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.

Требования к балансировщику:

  • Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.

  • Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.

  • Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.

Почему стандартные балансировщики не подходят

Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.

Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.

Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.

Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.

Основная идея кастомного балансировщика

Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.

  • Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.

  • Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.

  • Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.

Преимущества такого подхода:

  • Минимизация сетевых задержек.

  • Повышенная отказоустойчивость.

  • Гибкость настройки.

Обзор архитектуры решения

Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.

BalancerBuilder

Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.

type BalancerBuilder struct{}

 func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
 	return &Balancer{
     	cc:   	cc,
     	subConns: resolver.NewAddressMap(),
     	scStates: make(map[balancer.SubConn]connectivity.State),
     	csEvltr:  &balancer.ConnectivityStateEvaluator{},
     	state:	connectivity.Connecting,
 	}
 }

 func (b BalancerBuilder) Name() string { return balancerName }

 func init() {
 	balancer.Register(&BalancerBuilder{})
 }

Основные задачи билдера:

  • Создание и инициализация балансировщика.

  • Настройка взаимодействия с ClientConn.

  • Регистрация балансировщика для использования клиентом.

Resolver

Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.

type resolverBuilder struct {
 	addresses []resolver.Address
 }

 func (b *resolverBuilder) Build(
 	target resolver.Target,
 	clientConn resolver.ClientConn,
 	_ resolver.BuildOptions,
 ) (resolver.Resolver, error) {
 	ctx, cancel := context.WithCancel(context.Background())

 	res := &fiResolver{
     	ctx:        	ctx,
     	cancel:     	cancel,
     	target:     	target,
     	cc:         	clientConn,
     	addressesStore: b.addresses,
 	}

 	if len(b.addresses) > 1 {
     	res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig)
 	}

 	go res.start()

 	return res, nil
 }

 func (*resolverBuilder) Scheme() string {
 	return scheme
 }

 func initResolver(addresses []string) {
 	addressesStore := make([]resolver.Address, len(addresses))
 	for i, addr := range addresses {
     	addressesStore[i] = resolver.Address{
         	Addr:   	addr,
         	Attributes: attributes.New("index", i),
     	}
 	}

     resolver.Register(&resolverBuilder{addresses: addressesStore})
 }

Функции резолвера:

  • Динамическое обновление адресов.

  • Предоставление адресов с приоритетами балансировщику.

  • Сообщение об ошибках в случае недоступности адресов.

Picker

Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.

type firstIdxPicker struct {
 	result balancer.PickResult
 	err    error
 }

 func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
 	return p.result, p.err
 }

 func NewFIPicker(info base.PickerBuildInfo) balancer.Picker {
 	if len(info.ReadySCs) == 0 {
     	return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
 	}

 	minIdx := math.MaxInt
 	var selectedConn balancer.SubConn

 	for sc, scInfo := range info.ReadySCs {
     	idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения
     	if ok && idx < minIdx {
         	minIdx = idx
         	selectedConn = sc
    	 }
 	}

 	if selectedConn != nil {
     	return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}}
 	}

 	return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
 }

Алгоритм выбора:

  1. Проходит по всем готовым соединениям.

  2. Выбирает соединение с наименьшим index.

  3. Возвращает выбранное соединение для обработки запроса.

Balancer

Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.

type Balancer struct {
 	cc   	balancer.ClientConn
 	csEvltr  *balancer.ConnectivityStateEvaluator
 	state	connectivity.State

 	subConns *resolver.AddressMap
 	scStates map[balancer.SubConn]connectivity.State
 	picker   balancer.Picker

 	resolverErr error
 	connErr 	error
 }

 func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
 	b.resolverErr = nil

 	addressMap := b.createNewSubConnections(ccs)

 	for _, addr := range b.subConns.Keys() {
     	if _, ok := addressMap.Get(addr); !ok {
         	sci, _ := b.subConns.Get(addr)
         	sc := sci.(balancer.SubConn)
         	sc.Shutdown()
         	b.subConns.Delete(addr)
     	}
 	}

 	if len(ccs.ResolverState.Addresses) == 0 {
         b.ResolverError(errZeroAddresses)
     	return balancer.ErrBadResolverState
 	}

 	b.regeneratePicker()
     b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

 	return nil
 }

 func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
     oldState, ok := b.scStates[subConn]
 	if !ok {
     	return
 	}

 	b.scStates[subConn] = state.ConnectivityState

 	switch state.ConnectivityState {
 	case connectivity.Idle:
     	subConn.Connect()
 	case connectivity.Shutdown:
     	delete(b.scStates, subConn)
 	case connectivity.TransientFailure:
     	b.connErr = state.ConnectionError
 	}

 	b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState)

 	if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure {
     	b.regeneratePicker()
 	}

     b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
 }

 func (b *Balancer) regeneratePicker() {
 	if b.state == connectivity.TransientFailure {
     	b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)}
     	return
 	}

 	readySCs := make(map[balancer.SubConn]base.SubConnInfo)

 	for _, addr := range b.subConns.Keys() {
     	sci, _ := b.subConns.Get(addr)
     	sc := sci.(balancer.SubConn)
     	if state, ok := b.scStates[sc]; ok && state == connectivity.Ready {
         	readySCs[sc] = base.SubConnInfo{Address: addr}
     	}
 	}

 	b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs})
 }

Отслеживание состояний соединений:

  • UpdateClientConnState: создание новых и удаление неактуальных соединений.

  • UpdateSubConnState: обновление состояний существующих соединений.

  • regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.

Настройка и конфигурация

Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.

const (
 	scheme    	= "scheme-name"
 	balancerName  = "pick_idx_first"
 	defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}`
 	retryTimeout  = time.Millisecond * 100
 	maxRetries	= 10
 )

 type ConnOptions struct {
 	Addrs []string
 	Opts  []grpc.DialOption
 }

 func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) {
 	conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...)
 	if err != nil {
     	return nil, fmt.Errorf("unable to initialize conn: %w", err)
 	}

 	return conn, nil
 }

 func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {

 	...
 	opts = append(opts,
         grpc.WithDefaultServiceConfig(defaultConfig),
     	grpc.WithStreamInterceptor(
         	retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
     	),
     	grpc.WithUnaryInterceptor(
             retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
     	),
 	)
 	...

 	initResolver(addresses)

 	return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)

Параметры подключения:

  • scheme и balancerName: определяют кастомный балансировщик.

  • defaultConfig: задаёт конфигурацию балансировки.

  • Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.

Тестирование и результаты

В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.

Заключение

Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.

Преимущества кастомного решения:

  • Гибкость: настройка приоритетов адресов.

  • Эффективность: минимизация задержек за счёт выбора оптимального соединения.

  • Отказоустойчивость: автоматическое переключение при недоступности сервера.

Перспективы развития:

  • Динамическое обновление приоритетов.

  • Интеграция с сервисами обнаружения.

  • Расширение логики выбора на основе метрик производительности.

Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений. 

Ссылки и дополнительные материалы

© Habrahabr.ru