Здоровая конкуренция в GO. Главное не перехитрить самого себя

Несколько лет назад я прочитал статью о параллелизации в GO и ничего не понял — я тогда только начинал программировать на этом языке. Но размышления автора мне очень понравились — они подкреплялись бэнчмарками, что было довольно убедительно. Автор игрался c параметром GOMAXPROCS и показал, что увеличение этого параметра не всегда приводит к увеличению производительности. Под конец статьи он подобрал такое значение, которое будет максимально эффективным для его функции, на мое удивление, это значение оказалось равно единице! Т.е. его код работал максимально эффективно, если работал всего на одном ядре процессора! Однако, в одном из комментариев под той статьей я прочел, что все эти изыскания нелепы, поскольку та же самая функция из статьи запущенная всего в один поток оказывается эффективнее любой ее параллельной реализации.

С тех пор я написал уже много кода на GO, и могу поделиться мыслями о шаблонах параллельной обработки с теми, кто находится в том же состоянии, что и я когда то.


lnchseeyj51mdpktycy-ocvewgi.png

Почему я вообще пишу эту статью? Дело в том, что некоторое время назад я встретил в продуктовом коде забавный ограничитель. У нас есть одна задача, которая работает в несколько потоков, каждый из которых запрашивает данные у удаленного сервера, а затем обрабатывает их. Так вот по какой то причине автор решил установить лимит максимального количества потоков по количеству ядер процессора (через GOMAXPROCS). Я некоторое время думал над этим и пришел к выводу, что автор просто не до конца понимает нужные механики, вот и выбрал неправильный шаблон.

Некоторое время спустя, я занимался менторством, и мой подопечный задал мне вопрос про то, как работает параллелизм. Совершенно спонтанно у меня получилась небольшая и очень интересная лекция про три паттерна конкурентного кода. Ее материал я и выкладываю в этой статье.

Небольшое замечание: в статье я постоянно упоминаю «потоки» и «потоки выполнения», я не имею в виду потоки операционной системы или что-то другое конкретное. Я имею в виду именно потоки исполнения вашего кода, что в контексте GO будет эквивалентно go-рутинам.

Представьте, что у нас есть запрос к нашему веб-сервису, который порождает еще три запроса к подчиненным сервисам. Я не имею в виду микросервисы, это может быть что угодно, даже база данных или внешний веб-сервер. Основной момент тут в том, что это какой то внешний процесс, который выполняется на другой машине, не на той, где работает наш код.

В качестве примера я приведу код, выполняющий три HTTP запроса.

type Info struct {
   Name    string
   Balance float64
   Status  bool
}

var info Info
resp, _ = http.Get("http://someurl.com/name?id=00001")
info.Name = getNameFromResp(resp)

resp, _ = http.Get("http://someurl.com/getBalance?id=00001")
info.Balance = getBalanceFromResp(resp)

resp, _ = http.Get("http://someurl.com/getStatus?id=00001")
info.Status = getStatusFromResp(resp)

Это очень простой пример и тут даже не обрабатываются возможные ошибки, но суть в том, что каждый следующий запрос выполняется после предыдущего. Запросов может оказаться не три, а больше, самое главное, что аргументы каждого следующего запроса не зависят от результатов предыдущего.

Если мы перепишем этот код так, чтобы удаленные запросы выполнялись параллельно, то выйдет что-то подобное:

type Info struct {
   Name    string
   Balance float64
   Status  bool
}

var info Info
var wg sync.WaitGroup
wg.Add(3)
go func() {
   defer wg.Done()
   resp, _ = http.Get("http://someurl.com/name?id=00001")
   info.Name = getNameFromResp(resp)
}()
go func() {
   defer wg.Done()
   resp, _ = http.Get("http://someurl.com/getBalance?id=00001")
   info.Balance = getBalanceFromResp(resp)
}()
go func() {
   defer wg.Done()
   resp, _ = http.Get("http://someurl.com/getStatus?id=00001")
   info.Status = getStatusFromResp(resp)
}()

wg.Wait()

О да, теперь вышло сложно. Кроме того тут остались открытые вопросы. Например, что будет, если один из запросов вернет ошибку? Или, а не сломал ли автор конкурентный доступ к полям структуры, не будет ли состояние гонки в памяти? В случае последовательного выполнения все ясно — мы просто вернем ошибку вызывающему коду назад по стеку вызова. Но в случае параллельного выполнения нам нужно будет решить несколько задач:


  • Как вернуть ошибку в основной поток выполнения, который ожидает завершения всех побочных.
  • Нужно ли прервать выполнение побочных потоков исполнения или дождаться их естественного завершения?
  • Что делать если два или больше побочных потока вернули ошибку, склеить или взять первую?
  • Как избежать состояния гонки и как убедиться, что мы их избежали?

Ладно, не переживайте, последний вопрос решается вдумчивым тестированием кода с ключом — race, а другие вопросы можно решить с помощью готовых пакетов GO, например errgroup.

Если использовать этот пакет и написать аккуратный код, то усложнения вы даже не заметите (в этом примере), а вот эффективность выполнения кода возрастет и давайте поговорим, о том, почему и как это происходит, даже несмотря на то, что это и так очевидно.

Дело в том, что когда ваш код запрашивает что-то из сети, происходит переключение в режим ядра, т.е. ваша программа через программные прерывания говорит операционной системе о том, что нужно отправить кое-какие данные в сеть, а затем (вашему коду возвращают управление после отправки этих данных), она точно так же через прерывания говорит, что нужно прочитать кое-что из сети (да, отдельно нужно сходить за ответом). В тот момент, когда это происходит поток выполнения вашей программы ставится на паузу до конца выполнения операции чтения/записи операционной системой. Во время этой паузы другие потоки вашего приложения функционировать могут.

Я не буду говорить тут о go-рутинах и о том, что они не являются потоками ОС и о том, что же происходит со всеми рутинам, когда вдруг возникает необходимость в прерывании. Важно понимать, что в то время, когда поток выполнения «замерз» на выполнении сетевой задачи, ресурсы вашего компьютера не задействованы — поток просто ждет когда ему вернут управление. Если вы в этот момент запустите пять или сто таких потоков — это не отразится существенно на процессоре или оперативной памяти (справедливо для GO, но не для всех языков), поэтому вы просто можете сделать это и ждать, чего же произойдет.

Но сколько можно запустить таких одновременных потоков? При работе с сетью естественным ограничителем для нас будет количество сокетов. Если взять хотя бы по 20 тысяч свободных портов для одного удаленного IP (с которым работает ваш сервис), то получится солидное количество. Учитывая то, что сокеты можно переиспользовать, считайте, что лимита нет.

Теперь давайте решим проблему, которая может возникнуть, если вы будете следовать моему предыдущему совету. И заключается она вот в чем.

Представим, что удаленный сервер с которым работает наш код, не обладает бесконечными ресурсами. И мы, отправляя ему по сто одновременных запросов, делаем только хуже, т.к. с каждым следующим параллельным запросом его эффективность падает. Возможно у нас есть какой-то искусственный лимит на количество одновременных запросов, а возможно это естественное ограничение связанное с оперативной памятью или дешевым процессором удаленного сервера.

Давайте я продемонстрирую это с помощью простейшего кода:

const taskCount = 100


func main() {
   defer func(start time.Time) {
      fmt.Printf("done with: \u001B[31m%v\033[0m\n", time.Since(start))
   }(time.Now())

   goSameTime()
}

func goSameTime() {
   var wg sync.WaitGroup
   wg.Add(taskCount)
   for n := 0; n < taskCount; n++ {
      go func() {
         defer wg.Done()
         toDo()
      }()
   }
   wg.Wait()
}

Этот код запускает 100 раз функцию toDo и каждый раз в отдельном потоке выполнения. А вот каким образом я добавляю зависимость времени выполнения от количества параллельных выполнений (т.е. симулирую влияние на вычислительные ресурсы):

var par int64

func toDo() {
   c := atomic.AddInt64(&par, 1)
   defer atomic.AddInt64(&par, -1)
   <-time.After(time.Duration(c) * time.Duration(c) * time.Millisecond)
   <-time.After(100 * time.Millisecond)
}

Т.е. время выполнения отдельно взятой функции будет равняться квадрату от количества потоков выполнения, которые в данный момент выполняют эту функцию, плюс сто миллисекунд. Конечно, это синтетический случай, но в общем и целом он соответствует проблеме.

Вот, что я увидел, когда запустил код:

done with: 10.100843541s

Давайте попробуем осуществить ограничение на количество одновременных запросов к этому серверу. И у меня тут есть два подхода.


Локальное ограничение с помощью каналов.

Как известно, GO предлагает каналы (chan) в качестве основного средства синхронизации.


Don’t communicate by sharing memory; share memory by communicating.
R. Pike

И вот как мы можем использовать буферизированный канал, чтобы ограничить количество одновременных проходов в участок кода:

func goParallelTime(parallels int) {
   var wg sync.WaitGroup
   var throttle = make(chan struct{}, parallels)
   wg.Add(taskCount)
   for n := 0; n < taskCount; n++ {
      go func() {
         throttle <- struct{}{}
         // вот сюда уже не пройти всем одновременно
         defer func() {
            <-throttle
            wg.Done()
         }()
         toDo()
      }()
   }
   wg.Wait()
}

и вот, что я увидел, когда запустил такой код: goParallelTime (5)

done with: 2.52518417s

Так что, на лицо ускорение в 4 раза. И давайте разберемся, что же произошло при добавлении буферизированного канала пустых структур.

Как известно, в буферизированный канал GO можно положить только N данных, где N — это размер буфера. Так что механика тут довольно простая — перед тем, как начать выполнять действие, мы кладем в канал что-то, если там уже нет места, значит лимит на кол-во одновременных выполнений уже достигнут и поток выполнения будет заблокирован до тех пор, пока в канале не освободится место. После того, как действие будет выполнено (я положил код под defer) функция должна вычитать из канала, таким образом сообщая нашей программе, что освободилось место для выполнения еще одной задачи.

Почему я поставил кусок кода throttle < — struct{}{} (где мы пишем в канал) под функцию go-рутины, а не перед ней? Да потому, что запуск новой go-рутины не оказывает существенного влияния на производительность, поэтому мы можем просто сразу поставить все задачи в очередь, а затем позволить каналам рулить этой очередью. Хотя, в случае огромного количества задач этот код может привести к серьезной борьбе за ресурс, тогда возможно имеет смысл притормозить перед созданием go-рутины:

for n := 0; n < taskCount; n++ {
   throttle <- struct{}{}
   go func() {
      defer func() {
         <-throttle
         wg.Done()
      }()
      toDo()
   }()
}


Глобальное ограничение с помощью пула воркеров.

Хорошо, да все еще не очень хорошо. Ведь если ваше приложение само является сервисом. то точка входа может оказаться гораздо выше и сам ваш код, который собирается конкурентно работать с удаленным сервером, является конкурентно выполняемым кодом. Я имею в виду, что если тот участок кода, который мы рассматриваем, сам запускается в параллельных потоках выполнения? Тогда, если запустить его тысячу раз, все наши ограничения становятся бесполезны. Конечно, вы можете сделать глобальным канал из предыдущего примера, но есть еще одно решение.

Предлагаю вашему вниманию второй подход к ограничению — создание пула воркеров. При этом подходе, создавать пул воркеров нужно в самом начале — при инициализации вашего приложения, а передавать им задачу с нужными параметрами через один единственный канал. Конечно, тут возникают вопрос: как вернуть результат выполнения обратно? Можно это сделать тоже через канал, но уже для каждого отдельного задания нужно создавать свой канал. Можно обойтись callback подходом, как в JS, это дело вкуса и принятых в команде традиций.

Принципиальная схема выглядит так:

func goWorkers(parallels int) {
   var wg sync.WaitGroup
   var pool = make(chan struct{})
   go func() {
      defer close(pool)
      // в этой go-рутине происходит отправка задач в канал
      for n := 0; n < taskCount; n++ {
         pool <- struct{}{}
      }
   }()
   for n := 0; n < parallels; n++ {
      wg.Add(1)
      // а тут мы запускаем нужное количество воркеров
      go func() {
         defer wg.Done()
         worker(pool)
      }()
   }
   wg.Wait()
}

Конечно, тут не прослеживается тот нюанс о котором я сказал ранее — создавать воркеры при инициализации. И я не показал, как вернуть результат выполнения. Но с этими нюансами программист может справиться и решений тут может быть множество.

Самое главное в этом подходе — предварительный запуск необходимого количества воркеров и передача параметров задания через один единственный канал. Превышение лимитов одновременно выполняемых задач тут невозможно просто потому, что некому будет выполнять эти задачи — кол-во воркеров ограничено.

При таких подходах с ограничением кол-ва одновременно выполняемых запросов к удаленному серверу я рекомендую уточнять этот лимит опытным путем или ознакомиться с ним в договоре оказания услуг или в описании к выбранному тарифному плану (зависит от того, что у вас там). И конечно установленный лимит следует конфигурировать так, чтобы изменение этого параметра не требовало нового развертывания. А в идеале чтобы параметр применялся даже без перезапуска вашего сервиса.

Теперь переходим к самому интересному из этой статьи — параллельные вычисления в пределах одной машины (на ядрах процессора). Для того, чтобы симулировать длительные и сложные вычисления, я написал вот такой код:

func getHashSync(in string) string {
   var salt int
   for {
      if h, ok := getHashIf(in, salt); ok {
         return h
      }
      salt++
   }
}

func getHashIf(in string, salt int) (string, bool) {
   hash := md5.New()
   hash.Write([]byte(fmt.Sprintf("%d.%s", salt, in)))
   var buf [md5.Size]byte
   if x := hash.Sum(buf[:0]); isHashAcceptable(x) {
      return fmt.Sprintf("%x", x), true
   }
   return "", false
}

func isHashAcceptable(x []byte) bool {
   return x[15] == 0 && x[14] == 0 && x[13] == 0 && x[12] == 0
}

Боже, что за абсурд, скажете вы. Ну что я могу поделать, что есть под рукой, то и взял. Эта функция пытается подобрать такой хеш исходной строки плюс минимальное целое число в хвосте, чтобы последние четыре байта в конце хеша были равны нулю.

Т.е. мы берем какую то фразу, которую нужно посчитать, ставим в конце точку, а после точки ноль, пробуем вычислить MD5 и если в конце нет четырех нулей, берем ту же фразу, ставим в конце точку и число один… И так далее, пока не получим хэш с нулями в конце.

Если вам это знакомо, ставьте плюс статье. Я лично понятия не имею, зачем такая штука нужна, главное, что подобрать его не так то просто — нужно некоторое время греть процессор до 100%.

Итак, что же будет, если я запущу вот эту программу?

const phrase = `Как ваши дела?`

func main() {
   defer func(t time.Time) {
      fmt.Printf("done with %v\n", time.Since(t))
   }(time.Now())

   fmt.Println(
      getHashSync(phrase),
   )
}

Ровно одно ядро процессора на моей машине показало 100% нагрузку на 30 секунд и затем я увидел в консоли:

310b82171eedf93479d052ff00000000
done with 31.505403401s

Неплохо, но почему только одно ядро? Да все потому, что я программист написал такую программу, которая не параллелит вычисления внутри процессора. У меня ведь не стояла задача выполнить вычисления как можно быстрее. Так что в то время, как одно ядро было загружено, остальные семь занимались своей работой, предоставляя другим приложениям в моем компьютере свой ресурс.

Но что будет, если я захочу выполнить вычисления как можно скорее? Могу я захватить весь (или почти весь) ресурс процессора? Давайте попробуем.

Есть несколько способов распараллелить математические вычисления. Все зависит от их рода. Но в общей сложности параллелить можно только независимые части вычисления, которые в какой то момент агрегируются и превращаются в общий результат. В нашем случае можно взять и положить на разные ядра каждое конкретное вычисление хэша MD5 с конкретным числом в конце, тогда итоговая агрегирующая формула просто возьмет наименьший из правильных результатов. Да, подходящих хэшей может оказаться несколько и нам нужен тот результат, в котором подставляемое в конце число оказалось наименьшим.

Приступим к написанию наглядного кода. Сначала я запущу в отдельных go-рутинах столько воркеров, сколько у меня ядер в процессоре. Кол-во ядер процессора, доступных для запущенного приложения, я определяю с помощью runtime.NumCPU (). Затем мне нужно пара go-рутин: одна чтобы отправлять задания, одна чтобы закрыть канал с результатами, когда все воркеры будут «потушены». В основном потоке выполнения код будет ожидать возврата результата, а когда результат вернется, то сообщит о завершении работы через закрывающийся пустой канал done. Далее go-рутина, которая отправляет задания, получит этот сигнал и в свою очередь закроет канал с заданиями.

Ну да, а кто говорил, что будет легко?


Довольно объемный код под спойлером
type resSalt struct {
   salt   int
   result string
}

func getHashAsyncBad(in string) string {
   var task = make(chan int)
   var res = makeWorkers(in, task)
   var done = make(chan struct{})
   go func() {
      var salt int
      for {
         select {
         case <-done:
            close(task)
            return
         case task <- salt:
            salt++
            continue
         }
      }
   }()
   var o sync.Once
   var minResult resSalt
   for r := range res {
      o.Do(func() {
         close(done)
      })
      if minResult.result == "" {
         minResult = r
         continue
      }
      if minResult.salt > r.salt {
         minResult = r
         continue
      }
   }
   return minResult.result
}

func makeWorkers(in string, task <-chan int) chan resSalt {
   var res = make(chan resSalt)
   var wg sync.WaitGroup
   numCPU := runtime.NumCPU()
   wg.Add(numCPU)
   for n := 0; n < numCPU; n++ {
      go func() {
         defer wg.Done()
         for salt := range task {
            if h, ok := getHashIf(in, salt); ok {
               res <- resSalt{
                  salt:   salt,
                  result: h,
               }
               break
            }
         }
      }()
   }
   go func() {
      wg.Wait()
      close(res)
   }()
   return res
}

Отлично, запускаем код! И вот результат.

310b82171eedf93479d052ff00000000
done with 1m33.889545818s

Погодите, какая еще минута и 33 секунды? Что-то не то. Я ведь распараллелил все вычисления на восемь ядер. Вот и системный монитор подтверждает, что во время выполнения… Погодите минуту, всего 200% процессора было занято — это же только два ядра. Почему не задействованы остальные?

Ответ прост.

Да, я распараллелил вычисления, но посмотрите, сколько ресурсов расходуется на синхронизацию. Допустим, что для того, чтобы найти нужный хэш, который по счастливой случайности попадется на 100 итерации (на самом деле на 105956004), нам придется перебрать числа от 0 до 99, а на каждое из этих чисел необходимо бросить задание в канал, которое воркеры должны прочитать, затем они посчитают хэш и определят, подходит ли он по условию. То, что выполняет воркер — это очень маленькое действие для параллелизации, и когда оно заканчивается, может так оказаться, что отправляющая сторона еще не прислала других заданий. Так что тот же самый воркер может успеть вычитать из канала следующее задание.

На самом деле все сложнее конечно, но по факту расходы, для генерации задания и перемещения данных между go-рутиной, которая создала задание, и go-рутиной, которая его получила, соизмеримы с той работой, которую выполняет воркер. Так параллелизация не делается.

Давайте сокращать. Я предлагаю сделать батч: я бросаю в канал с заданиями гранулированные задания — не уверен, что правильно подобрал термин, но я имею в виду, что буду отправлять не 0, 1, 2…, а сразу 0, 1000, 2000. А воркер в свою очередь будет считать хэши от того salt, который получит из канала и до salt + 999. Выглядит, как будто будут выполняться лишние вычисления: судите сами, когда один из воркеров завершит работу, все остальные должны будут досчитать свои гранулы, а это может быть 1000 * (кол-во воркеров — 1). Вот как то так. Но с учетом того, что мы экономим на синхронизации (а это одна из самых дорогих операций) все должно быть не так уж плохо.

Что ж, попробуем.


Еще объемный код под спойлером
type resSalt struct {
   salt   int
   result string
}

func getHashAsync(in string) string {
   const batchSize = 1000
   var task = make(chan int)
   var res = makeWorkersBatched(in, task, batchSize)
   var done = make(chan struct{})
   go func() {
      var salt int
      for {
         select {
         case <-done:
            close(task)
            return
         case task <- salt:
            salt += batchSize
         }
      }
   }()
   var o sync.Once
   var minResult resSalt
   for r := range res {
      o.Do(func() {
         close(done)
      })
      if minResult.result == "" {
         minResult = r
         continue
      }
      if minResult.salt > r.salt {
         minResult = r
         continue
      }
   }
   return minResult.result
}

func makeWorkersBatched(in string, task <-chan int, batchSize int) chan resSalt {
   var res = make(chan resSalt)
   var wg sync.WaitGroup
   numCPU := runtime.NumCPU()
   wg.Add(numCPU)
   for n := 0; n < numCPU; n++ {
      go func() {
         defer wg.Done()
         for saltStart := range task {
            for salt := saltStart; salt < saltStart+batchSize; salt++ {
               if h, ok := getHashIf(in, salt); ok {
                  res <- resSalt{
                     salt:   salt,
                     result: h,
                  }
                  break
               }
            }
         }
      }()
   }
   go func() {
      wg.Wait()
      close(res)
   }()
   return res
}

Теперь это выглядит так, запускаем код…

310b82171eedf93479d052ff00000000
done with 5.536218303s

Вот таким ты мне нравишься больше! Системный монитор показал 700% загрузки процессора моим кодом, что означает, что выполнение заняло почти все ресурсы процессора, а это значит, что вычисление распараллелилось на все ядра. По итогу я выиграл в 6 раз. 5 секунд параллельной работы против 30 секунд однопоточного кода. Ну, а нужно ли это кому-нибудь вообще — это совсем другая история.

Теперь маленькие ремарки по поводу такого кода.

Обратите внимание на сбор результатов в конце:

   for r := range res {
      o.Do(func() {
         close(done)
      })
      if minResult.result == "" {
         minResult = r
         continue
      }
      if minResult.salt > r.salt {
         minResult = r
         continue
      }
   }

Я не могу точно гарантировать, что конкурентно выполняющиеся потоки не опережают друг друга и правильный хэш был посчитан раньше, чем следующий за ним, подходящий по условиям, но с большим числом salt. Поэтому необходимо проверить все результаты и выбрать наименьший. По этой же причине нельзя останавливать подсчет хэшей в воркерах сразу же при получении подходящего результата — все воркеры должны завершить работу и вернуть свои результаты, если они есть.

Параллельные потоки выполняются не последовательно, постоянно опережая друг друга, и вам всегда необходимо опираться на тот участок кода, который выполняется последовательно, чтобы синхронизировать результаты, а не взять что-то кажущееся подходящим, что выпало как можно быстрее.

Так что, вы сами видите, что конкурентный код увеличивает сложность как плату за свою производительность. Каждый раз нам необходимо продумать, как получать результаты выполнения, нужно ли остановить все побочные потоки и как это сделать. Стоит ли ограничивать кол-во параллельных выполнений и от чего будет зависеть этот лимит.

Сложность не только в том, что нам нужно решить все это, а еще в том, как написать код так, чтобы через какое то время не было мучительно больно его читать. Но, если заранее известно, что дело того стоит, не ошибитесь в выборе подхода.

Хотелось бы резюмировать. Я начал работу над материалами этой статьи, когда встретил логику ограничения потоков выполнения, основанную на результате выполнения функции GOMAXPROCS (0), которая по умолчанию будет равна runtime.NumCPU. Это подходит под паттерн с параллельными вычислениями на одном процессоре, но использовалось для ограничения кол-ва одновременных запросов к удаленному серверу, который наверняка даже работал на другой машине. Прошу читателя по возможности не делать так, потому что производительность вашего приложения после этого будет зависеть от кол-ва ядер процессора, которые ваше приложение использовать все равно не будет.

Поэтому, я хотел бы лишний раз подчеркнуть:


  • Если у вас фиксированное число удаленных запросов, параметры которых не зависят друг от друга и заранее известны, то используйте первый паттерн из этой статьи — выполнение параллельно без ограничений с применением errgroup. Так вы заставите ваше приложение работать в тот момент, пока оно ожидает ответ от удаленного сервиса.
  • Если при масштабировании возможен бесконтрольный рост одновременно выполняемых удаленных запросов — ограничивайте одновременное выполнение, а лимит подбирается опытным путем и его можно вынести в настройки. Так вы не дадите удаленному сервису нагрузку большую, чем он сможет обработать.
  • Если у вас есть сложное вычисление, которое необходимо ускорить и вы знаете, как разложить его на разные потоки, то используйте в качестве лимита runtime.NumCPU. Так вы заставите работать все ядра процессора.

Ну, а вам желаю удачи в параллелизации. Появится интересная мысль — напишу еще.



Возможно, захочется почитать и это:


b5pjofdoxth14ro-rjsrn7sbmiy.png

© Habrahabr.ru