Паттерны многопоточности в Go

f9c6ec10bd7049730e6f39f44c6847a8.png

Привет! Ты читаешь эту статью, а значит интересуешься Go и хочешь прокачать свои навыки в многопоточности. Наверняка ты уже знаком и активно пользуешь примитивы многопоточного программирования (горутины, каналы, мьютексы). В этой статье я постараюсь рассказать о том как их компоновать в самые популярные паттерны для поддержки масштабируемости и удобства сопровождения будущих систем.

Почему же многопоточность так важна? Сегодняшние приложения должны быть быстрыми и отзывчивыми, а для этого нужно выполнять множество задач одновременно. 

Представь себе, что твоя программа обрабатывает запросы пользователей, одновременно загружает данные с сервера и проводит вычисления. И все это должно происходить без задержек и ошибок. Для всего этого у тебя есть горутинки! Однако если с ними обращаться неосторожно, то могут возникнуть такие ошибки, которые сложно будет сразу отловить. Вот здесь на сцену выходят паттерны многопоточности, которые помогают структурировать и оптимизировать параллельную работу.

Ты можешь сохранить эту статью и пользоваться ей как шпаргалкой. Вот список паттернов, которые разберем сегодня:

  • Feature/Promise — асинхронный запрос на выполнение задачи, не блокирующий основной поток.

  • Generator — простой и удобный способ создания потока данных. С его помощью можно запускать горутину, которая генерирует значения и передает их через канал.

  • Pipeline — мощный паттерн, позволяющий разбить задачу на несколько этапов обработки данных, организуя плавный поток через горутины.

  • Fan-in и Fan-out —  fan-out помогает распараллелить выполнение одной задачи на несколько горутин, а fan-in помогает собрать результаты этих горутин в один поток данных.

  • Semaphore — удобный инструмент для контроля числа одновременно выполняемых горутин, защищающий от перегрузок системы.

  • Worker Pool — организует набор горутин для параллельной обработки задач, что особенно полезно для оптимизации производительности.

  • Обработка ошибок в горутинах — паттерн, помогающий корректно и безопасно обрабатывать ошибки, возникающие в горутинах, через каналы.

51f67b263d9a912cd05c7ffcfee680f8.png

Feature/Promise

Начнем с самого сладкого. JS разработчики сейчас довольно трут ладошки. Да ребята, это что-то типа async/await в JavaScript. Этот паттерн позволяет выполнять задачи в фоне и получать результат по мере завершения этих задач, не блокируя основной поток выполнения прогарммы.

Детальки:

  • Promise — обещание предоставить результат операции в будущем. Оно используется для запуска задачи и получения объекта, который будет хранить результат выполнения задачи (он же async).

  • Future — это объект, который позволяет проверять готовность результата и извлекать его, когда он будет доступен (он же await).

Но кода конечно больше чем в JavaScript:

package main

import (
    "fmt"
    "time"
)

func Promise(task func() int) chan int {
    resultCh := make(chan int, 1) // создаем канал для результата

    go func() {
        result := task()          // выполняем задачу
        resultCh <- result        // отправляем результат в канал
        close(resultCh)           // закрываем канал после выполнения
    }()

    return resultCh
}

func main() {
    // Задача, которая занимает 2 секунды
    longRunningTask := func() int {
        time.Sleep(2 * time.Second)
        return 42
    }

    // Запускаем задачу через Promise
    future := Promise(longRunningTask)

    fmt.Println("Задача запущена, можно делать что-то еще...")

    // Ожидаем результат
    result := <-future
    fmt.Println("Результат:", result)
}

В этом примере задача запускается в фоне и ты можешь делать что-то еще, пока она не выполнится. Дальше ты получаешь результат через канал, который выступает в роли feature. Как только задача завершилась, результат становится доступен, и программа продолжает свое выполнение.

Этот подход удобен тем, что позволяет избежать блокировки основного потока выполнения, особенно если задача требует длительного времени. Ты просто запускаешь задачу, делаешь другие вещи, а затем, когда задача завершена, возвращаешься к ее результату.

Если же тебе нужно обрабатывать ошибки в такой асинхронной задаче, можно расширить пример, добавив передачу как результата, так и ошибки. Это делается с помощью структуры, которая будет содержать и результат выполнения, и ошибку. Такой подход позволяет не только получить результат задачи, но и обработать возможные сбои.

Гляди:

package main

import (
    "fmt"
    "errors"
    "time"
)

type Result struct {
    value int
    err   error
}

func Promise(task func() (int, error)) chan Result {
    resultCh := make(chan Result, 1) // создаем канал для результата

    go func() {
        value, err := task()          // выполняем задачу
        resultCh <- Result{value: value, err: err} // отправляем результат и ошибку в канал
        close(resultCh)               // закрываем канал
    }()

    return resultCh
}

func main() {
    // Задача, которая возвращает ошибку
    taskWithError := func() (int, error) {
        time.Sleep(2 * time.Second)
        return 0, errors.New("что-то пошло не так")
    }

    // Запускаем задачу через Promise
    future := Promise(taskWithError)

    fmt.Println("Задача запущена, можно делать что-то еще...")

    // Ожидаем результат
    result := <-future
    if result.err != nil {
        fmt.Println("Ошибка:", result.err)
    } else {
        fmt.Println("Результат:", result.value)
    }
}

Теперь, если задача завершится ошибкой, мы можем ее корректно обработать, а не просто игнорировать или выводить в лог. Этот подход делает код более надежным и позволяет лучше контролировать асинхронные операции.

Паттерн Future/Promise хорошо подходит для тех случаев, когда ты хочешь запустить задачу в фоне и получить ее результат, не блокируя выполнение программы. Это отличный способ обрабатывать долгие операции, такие как сетевые запросы или сложные вычисления, и одновременно делать другие полезные вещи, не теряя при этом контроль над результатами.

22e8cd76f2622993eb5f11ddac03a36f.png

Generator

Если мы пилим что-то в Go для работы с многопоточностью, то чаще всего это будет работать по схеме «производитель-потребитель». Производитель генерирует данные и отправляет их по каналу, а потребитель принимает и обрабатывает их. Это позволяет параллельно обрабатывать данные, не заполняя всю оперативную память, что особенно полезно при работе с большими объемами информации или даже бесконечными потоками данных. Для этого как раз мы и используем паттерн Generator.

Генератор создает значения и отправляет их потребителю через канал. При этом отправка и получение данных блокируются до тех пор, пока обе стороны не будут готовы. Такой подход позволяет обрабатывать данные по мере их поступления, минимизируя задержки.

Посмотри на пример:

package main

import "fmt"

func main() {
    // Данные, которые будут отправляться в канал
    items := []int{10, 20, 30, 40, 50}

    // Получаем канал с данными из генератора
    dataChannel := generator(items)

    // Потребитель обрабатывает данные из канала
    process(dataChannel)
}

// generator создает канал и запускает горутину для отправки данных
func generator(items []int) chan int {
    ch := make(chan int)

    go func() {
        // Закрываем канал после завершения отправки данных
        defer close(ch)

        // Перебираем элементы и отправляем их в канал
        for _, item := range items {
            ch <- item
        }
    }()

    return ch
}

// process получает данные из канала и выводит их
func process(ch chan int) {
    for item := range ch {
        fmt.Println(item)
    }
}

В примере генератор отправляет данные в канал через отдельную горутину, а затем потребитель их обрабатывает, читая по одному значению. Заметь, что генератор закрывает канал по завершению работы, что важно для сигнализации потребителю о конце передачи данных.

Этот паттерн удобен тем, что генератор и потребитель могут работать одновременно, не дожидаясь завершения всех вычислений. Это экономит память, поскольку данные обрабатываются по мере их поступления, а не накапливаются в памяти полностью. Особенно полезно это, когда работаешь с большими или бесконечными потоками данных.

Pipeline

Это классный паттерн, который поначалу покажется сложным и ненужным, однако в некоторых случаях он крайне необходим. Pipeline позволяет разделить сложную задачу на несколько простых. Входные данные для одной подзадачи становятся выходными для другой, и этот процесс продолжается до тех пор, пока вся цепочка не завершится. Такой подход помогает разбивать обработку данных на этапы и эффективно управлять потоками данных.

В первом примере посмотри, как это работает без использования горутин:

package main

import "fmt"

func main() {
    value := 1

    // пример 1: сложение, затем умножение
    fmt.Println(multiply(add(value, 2), 3))

    // пример 2: поменяли местами этапы
    fmt.Println(add(multiply(value, 2), 3))
}

// add — функция сложения
func add(a, b int) int {
    return a + b
}

// multiply — функция умножения
func multiply(a, b int) int {
    return a * b
}

Как ты заметил, все операции выполняются последовательно. Один этап начинается только тогда, когда предыдущий закончил обработку всех данных. Однако, используя горутины, можно обработать данные параллельно, ускоряя выполнение.

Теперь я добавлю горутины и каналы, чтобы сделать наш конвейер более эффективным. Изменим функции add и multiply, чтобы они работали с каналами:

package main

import "fmt"

// add — добавляет 2 к каждому значению из inputCh и возвращает канал с результатами
func add(doneCh chan struct{}, inputCh chan int) chan int {
    resultCh := make(chan int)
    
    go func() {
        defer close(resultCh)

        for value := range inputCh {
            result := value + 2

            select {
            case <-doneCh: // если нужно завершить горутину
                return
            case resultCh <- result: // отправляем результат
            }
        }
    }()
    
    return resultCh
}

// multiply — умножает каждое значение на 3 и возвращает канал с результатами
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
    resultCh := make(chan int)

    go func() {
        defer close(resultCh)

        for value := range inputCh {
            result := value * 3

            select {
            case <-doneCh:
                return
            case resultCh <- result:
            }
        }
    }()
    
    return resultCh
}

// generator — отправляет данные в канал
func generator(doneCh chan struct{}, numbers []int) chan int {
    outputCh := make(chan int)

    go func() {
        defer close(outputCh)

        for _, num := range numbers {
            select {
            case <-doneCh:
                return
            case outputCh <- num:
            }
        }
    }()

    return outputCh
}

func main() {
    // данные, которые будем обрабатывать
    numbers := []int{1, 2, 3, 4, 5}

    // канал для остановки работы горутин
    doneCh := make(chan struct{})
    defer close(doneCh)

    // запускаем генератор, который отправляет числа
    inputCh := generator(doneCh, numbers)

    // этапы конвейера: сначала add, потом multiply
    addCh := add(doneCh, inputCh)
    resultCh := multiply(doneCh, addCh)

    // выводим результаты
    for res := range resultCh {
        fmt.Println(res)
    }
}

Таким образом, когда ты передаешь числа 1, 2, 3 в пайплайн, данные проходят через несколько этапов параллельной обработки. Например, пока одно число умножается, другое уже может обрабатываться на этапе сложения. Это позволяет распараллеливать работу над большими данными без значительной нагрузки на 1 CP.

Паттерн Pipeline отлично подойдет для задачи, где обработка может быть разбита на несколько этапов. Как вариант можно организовать конвеер для поиска данных, фильтрации и сохранения результатов в БД.

e806c4c09394d9ff383fb27e6da6b0cb.png

Fan-In Fan-Out

Как говорится начнем с конца. Паттерн Fan-Out позволяет распределить вычисления на несколько горутин, для параллельной обработки данных. Таким образом ты можешь увеличить пропускную способность.

Предположим что у тебя есть функция add (), которая требует много ресурсов для выполнения. Чтобы оптимизировать ее работу, можно увеличить количество горутин.

Вот пример:

package main

import (
    "fmt"
    "time"
)

// generator — создает канал с данными
func generator(doneCh chan struct{}, numbers []int) chan int {
    dataStream := make(chan int)

    go func() {
        defer close(dataStream)
        for _, num := range numbers {
            select {
            case <-doneCh:
                return
            case dataStream <- num:
            }
        }
    }()

    return dataStream
}

// add — добавляет 1 к каждому значению
func add(doneCh chan struct{}, inputCh chan int) chan int {
    resultStream := make(chan int)

    go func() {
        defer close(resultStream)
        for num := range inputCh {
            // Имитация более затратной работы
            time.Sleep(time.Second)
            result := num + 1

            select {
            case <-doneCh:
                return
            case resultStream <- result:
            }
        }
    }()

    return resultStream
}

// multiply — умножает каждое значение на 2
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
    resultStream := make(chan int)

    go func() {
        defer close(resultStream)
        for num := range inputCh {
            result := num * 2

            select {
            case <-doneCh:
                return
            case resultStream <- result:
            }
        }
    }()

    return resultStream
}

// fanOut — создает несколько горутин add для параллельной обработки данных
func fanOut(doneCh chan struct{}, inputCh chan int, workers int) []chan int {
    resultChannels := make([]chan int, workers)
    
    for i := 0; i < workers; i++ {
        resultChannels[i] = add(doneCh, inputCh)
    }

    return resultChannels
}

Функция fanOut () создает несколько горутин для выполнения add (). Данные начинают обрабатываться параллельно в разных горутинах. Представим что эта функция становится медленной из-за каких-то вычислений, мы просто делим ее на 10 частей и выполняем отдельно друг от друга.

А что если потребуется потребуется слить эти каналы обратно? Для этого есть паттерн Fan-In. Он объединяет несколько каналов в один.

Соберем результаты:

// fanIn — объединяет результаты нескольких каналов в один
func fanIn(doneCh chan struct{}, channels ...chan int) chan int {
    finalStream := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        chCopy := ch
        wg.Add(1)
        
        go func() {
            defer wg.Done()
            for value := range chCopy {
                select {
                case <-doneCh:
                    return
                case finalStream <- value:
                }
            }
        }()
    }

    go func() {
        wg.Wait()
        close(finalStream)
    }()

    return finalStream
}

Как ты уже наверное заметил, функция fanIn () принимает несколько каналов и запускает отдельные горутины, чтобы извлечь данные из каждого канала. Эти данные объединяются в один конечный канал, который возвращается основному поток.

Давай попробуем использовать обе функции вместе:

package main

import (
    "fmt"
    "sync"
)

func main() {
    numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    doneCh := make(chan struct{})
    defer close(doneCh)

    inputCh := generator(doneCh, numbers)

    // создаем 10 горутин add с помощью fanOut
    channels := fanOut(doneCh, inputCh, 10)

    // объединяем результаты из всех каналов
    addResultCh := fanIn(doneCh, channels...)

    // передаем результат в следующий этап multiply
    resultCh := multiply(doneCh, addResultCh)

    // выводим результаты
    for result := range resultCh {
        fmt.Println(result)
    }
}

Что происходит:

  1. Fan-Out: создаем 10 горутин с помощью fanOut (), каждая из которых выполняет функцию add (). Это увеличивает скорость обработки данных на этапе add ().

  2. Fan-In: с помощью fanIn () объединяем результаты всех горутин add () в один поток.

  3. Pipeline: передаем результат в следующий этап обработки, где данные умножаются на 2 с помощью функции multiply ()

В общем, Fan-Out и Fan-In помогают параллельно обрабатывать большие объемы данных, увеличивая пропускную способность программы. Однако при этом порядок выходных данных может отличаться от порядка входных, так как разные горутины могут завершаться в разное время. Этот подход полезен для оптимизации времени выполнения ресурсоемких этапов программы, особенно когда важно быстро обрабатывать данные.

Semaphore

Наверное мой любимый паттерн, часто использую. Нужен он, когда нужно ограничить количество горутин, которые одновременно работают с определенным ресурсом. Это помогает избежать перегрузки ресурса, например, базы данных, и позволяет управлять количеством параллельно выполняемых задач.

Если работал с реляционными БД, то наверняка сейчас увидишь знакомый синтаксис. Используя буферизованный канал, можно реализовать семафор. Вот пример кода:

package main

import (
    "log"
    "sync"
    "time"
)

// Semaphore — структура для управления количеством параллельных горутин
type Semaphore struct {
    semaCh chan struct{}
}

// NewSemaphore — создает новый семафор с заданной максимальной емкостью
func NewSemaphore(maxReq int) *Semaphore {
    return &Semaphore{
        semaCh: make(chan struct{}, maxReq),
    }
}

// Acquire — резервирует место в семафоре
func (s *Semaphore) Acquire() {
    s.semaCh <- struct{}{}
}

// Release — освобождает место в семафоре
func (s *Semaphore) Release() {
    <-s.semaCh
}

Принцип работы Semaphore прост:

  • Счетчик семафора определяет, сколько горутин могут одновременно работать с ресурсом.

  • Когда горутина хочет начать работу с ресурсом, она увеличивает счетчик.

  • Если счетчик достиг максимального значения горутина блокируется и ждет, пока одна из работающих горутин не освободит место, уменьшив счетчик.

  • Следующего пункта не будет. На этом все:-)

Теперь давай создадим семафор с емкостью 2, который будет пропускать только 2 горутины одновременно. Остальные горутины будут ожидать своей очереди:

package main

import (
    "log"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    // создаем семафор, который позволит работать только двум горутинам одновременно
    semaphore := NewSemaphore(2)

    // запускаем 10 горутин
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func(taskID int) {
            // резервируем место в семафоре перед началом работы
            semaphore.Acquire()

            // когда горутина завершает работу, освобождаем место и уменьшаем счетчик WaitGroup
            defer wg.Done()
            defer semaphore.Release()

            // симулируем работу горутины
            log.Printf("Запущен рабочий %d", taskID)
            time.Sleep(1 * time.Second)
        }(i)
    }

    // ждем завершения всех горутин
    wg.Wait()
}

Что происходит в коде:

  • Мы создаем семафор, который позволяет только двум горутинам одновременно выполнять свою работу.

  • В цикле запускается 10 горутин. Каждая из них перед началом работы «захватывает» семафор с помощью метода Acquire (). Если все места заняты, горутина ждет.

  • Когда горутина завершает выполнение, она вызывает Release (), освобождая место для следующей горутины.

Результат выполнения программы:

❯ go run main.go
2024/10/22 20:32:44 Запущен рабочий 0
2024/10/22 20:32:44 Запущен рабочий 1
2024/10/22 20:32:45 Запущен рабочий 2
2024/10/22 20:32:45 Запущен рабочий 3
2024/10/22 20:32:46 Запущен рабочий 4
2024/10/22 20:32:46 Запущен рабочий 5
2024/10/22 20:32:47 Запущен рабочий 6
2024/10/22 20:32:47 Запущен рабочий 7
2024/10/22 20:32:48 Запущен рабочий 8
2024/10/22 20:32:48 Запущен рабочий 9

Как ты уже заметил, семафор пропускает только 2 горутины одновременно, а остальные горутины начинают выполняться только после завершения предыдущих. Он подходит для ситуаций, когда ресурсы, такие как база данных или API, могут обрабатывать ограниченное количество запросов одновременно.

3f48e8345d6890d5d3eead93a35122fd.png

WorkerPool

Мастхэв. Самый хайповый паттерн. Используется часто. В нем используется «пул воркеров» для паралельной обработки задач из общей очереди. 

Основные элементы:

  • Очередь задач, через которую воркеры получают задачи на выполнение.

  • Набор воркеров, которые берут задачи из очереди и обрабатывают их.

Worker Pool еще удобен тем, что абстрагирует тебя от необходимости вручную управлять созданием и завершением горутин для каждой задачи из очереди. Воркеры получают их автоматически, по мере появления.

Давай посмотрим на код:

package main

import (
    "fmt"
    "time"
)

// worker — функция, представляющая нашего рабочего процесса
// Принимает id рабочего, канал задач и канал для отправки результатов
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Рабочий %d начал выполнение задачи %d\n", id, job)
        time.Sleep(time.Second) // симулируем выполнение задачи
        fmt.Printf("Рабочий %d завершил выполнение задачи %d\n", id, job)
        results <- job * 2 // отправляем результат
    }
}

func main() {
    const numJobs = 5 // количество задач для выполнения
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // создаем пул из 3 рабочих
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // отправляем задачи в канал jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    // закрываем канал задач, чтобы рабочие поняли, что больше задач не будет
    close(jobs)

    // получаем результаты от воркеров
    for r := 1; r <= numJobs; r++ {
        res := <-results
        fmt.Printf("Результат: %d\n", res)
    }
}

Как работает этот код:

  • Очередь задач (jobs) содержит задачи для обработки. Она наполняется заданиями в основном потоке программы.

  • Пул воркеров — создаются три горутины, каждая из которых представляет рабочего. Они получают задачи из канала jobs, обрабатывают их и отправляют результаты в канал results.

  • Когда все задачи отправлены в очередь, канал задач закрывается с помощью close (jobs), что сигнализирует воркерам о завершении работы.

  • Результаты обрабатываются по мере поступления и выводятся в консоль.

Пример вывода:

Рабочий 1 начал выполнение задачи 1
Рабочий 2 начал выполнение задачи 2
Рабочий 3 начал выполнение задачи 3
Рабочий 1 завершил выполнение задачи 1
Рабочий 1 начал выполнение задачи 4
Рабочий 2 завершил выполнение задачи 2
Рабочий 2 начал выполнение задачи 5
Рабочий 3 завершил выполнение задачи 3
Рабочий 1 завершил выполнение задачи 4
Рабочий 2 завершил выполнение задачи 5
Результат: 2
Результат: 4
Результат: 6
Результат: 8
Результат: 10

Worker Pool отлично подходит для ситуаций, когда нужно обрабатывать большое количество однотипных задач, например, входящие запросы к API, работа с файлами, запросы к базе данных и другие задачи, требующие параллельной обработки. Паттерн позволяет эффективно распределять нагрузку, не создавая излишнего количества горутин.

Обработка ошибок в горутинах

Когда ты работаешь с горутинами, каждая из них выполняется независимо от своего родителя. Это создает определенные трудности в обработке ошибок: ошибки в параллельных горутинах могут оставаться незамеченными, если их не вернуть в основной поток программы. Дальше я покажу тебе, как можно обрабатывать ошибки, возникающие в горутинах, и передавать их обратно для дальнейшей обработки.

Пример без обработки ошибок:

package main

import (
    "errors"
    "log"
    "time"
)

func main() {
    input := []int{1, 2, 3, 4}

    // генератор возвращает канал с данными
    inputCh := generator(input)

    // потребитель, обрабатывающий данные
    go consumer(inputCh)

    // добавим время сна, чтобы ошибки успели вывести на экран
    time.Sleep(time.Second)
}

// generator отправляет данные в канал и закрывает его
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)

        for _, data := range input {
            inputCh <- data
        }
    }()
    return inputCh
}

// consumer принимает данные и вызывает функцию, которая возвращает ошибку
func consumer(ch chan int) {
    for data := range ch {
        err := callDatabase(data)
        if err != nil {
            log.Println(err) // простой вывод ошибки в лог
        }
    }
}

// callDatabase симулирует вызов к базе данных и всегда возвращает ошибку
func callDatabase(data int) error {
    return errors.New("ошибка запроса к базе данных")
}

Этот код просто выводит ошибки, возникающие в горутинах, но не передает их в основной поток программы. Это не самое эффективное решение, так как ошибки остаются не обработанными в одном месте. Давай исправим это.

Теперь мы будем передавать ошибки из горутин через канал с исполдьзованием структуры:

package main

import (
    "log"
    "errors"
)

// Result — структура для хранения результата и ошибки
type Result struct {
    data int
    err  error
}

func main() {
    input := []int{1, 2, 3, 4}

    resultCh := make(chan Result)

    // запускаем потребителя, который будет отправлять результаты и ошибки
    go consumer(generator(input), resultCh)

    // читаем результаты
    for res := range resultCh {
        if res.err != nil {
            log.Println("Ошибка:", res.err)
        } else {
            log.Println("Результат:", res.data)
        }
    }
}

// generator отправляет данные в канал
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)
        for _, data := range input {
            inputCh <- data
        }
    }()

    return inputCh
}

// consumer вызывает функцию, которая может возвращать ошибку
func consumer(inputCh chan int, resultCh chan Result) {
    defer close(resultCh)

    for data := range inputCh {
        resp, err := callDatabase(data)
        resultCh <- Result{data: resp, err: err}
    }
}

// callDatabase возвращает ошибку
func callDatabase(data int) (int, error) {
    return data, errors.New("ошибка запроса к базе данных")
}

Теперь ошибки передаются через канал resultCh в основную горутину, где можно с ними работать. Это удобный способ обработки ошибок в многопоточных задачах.

Однако, есть еще errgroup из пакета golang.org/x/sync/errgroup который предоставляет синхронизацию, распространение ошибок и отмену контекста для групп горутин. Это будет полезно, когда ты захочешь выполнить несколько параллельных задач и убедиться, что все они завершились успешно. Если одна из горутин вернет ошибку, errgroup передаст ее в основную горутину, которая сможет обработать ее.

Вот код:

package main

import (
    "context"
    "errors"
    "log"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, _ := errgroup.WithContext(context.Background())
    input := []int{1, 2, 3, 4}

    inputCh := generator(input)

    for data := range inputCh {
        data := data // создание новой переменной для каждого запуска горутины
        g.Go(func() error {
            err := callDatabase(data)
            if err != nil {
                return err
            }
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        log.Println("Ошибка:", err)
    }
}

// generator отправляет данные в канал
func generator(input []int) chan int {
    inputCh := make(chan int)

    go func() {
        defer close(inputCh)
        for _, data := range input {
            inputCh <- data
        }
    }()

    return inputCh
}

// callDatabase возвращает ошибку, если data равно 3
func callDatabase(data int) error {
    if data == 3 {
        return errors.New("ошибка запроса к базе данных")
    }
    return nil
}

Если в одной из горутин возникнет ошибка, она вернется в основной поток программы, где можно ее обработать. Например, если значение data равно 3, кастанется ошибка, и вывод будет таким:

2024/10/22 21:17:40 ошибка запроса к базе данных

Пакет errgroup удобен, когда тебе нужно выполнить несколько параллельных задач и убедиться, что они завершены. В случае ошибки тебе не нужно беспокоиться о том, сколько таких ошибок произошло — достаточно обработать первую возникшую ошибку.

1bbd77f66e31fd787877753e9d2cc73c.png

Разобранные паттерны — это безусловно мощный инструмент в твоих руках, который поможет эффективно распределять задачи между разными потоками, улучшая производительность программы. Они позволяют обрабатывать большие объемы данных, работать с параллельными задачами и ускорять сложные вычисления. Однако, их использование требует разумного подхода.

Применение многопоточности без необходимости может не только усложнить код, но и сделать его менее читаемым, замедлить выполнение и привести к ошибкам. Слепое использование паттернов, там где можно обойтись простым синхронным выполнением, часто создает больше проблем, чем решает.

Поэтому всегда важно взвешивать, стоит ли разбивать задачу на несколько потоков, или ее лучше выполнить последовательно и синхронно. Хороший разработчик не только знает паттерны, но и умеет выбирать подходящий инструмент для конкретной задачи.

Под моей прошлой статьей про функциональные опции было какое-то количество негативных комментариев, типа «Да зачем это нужно?» и «Смотри, можно лучше». Наверняка под этой статьей такие комментаторы тоже будут. Вы все безусловно правы! Вы молодцы, вы самоутвердились и я с вами спорить не собираюсь :-) Моя цель, в том чтобы в одном месте, максимально подробно и главное на Русском языке описать эти паттерны, чтобы те кто хочет развиваться мог их понять и в дальнейшем пользоваться.

Спасибо за уделенное время, надеюсь материал был полезен :-)

Па

© Habrahabr.ru