Распределенные вычисления в Elixir — классический пример MapReduce

Распределенные вычисления в Elixir


Elixir и Erlang идеально подходят для создания распределенных приложений, выполняющих параллельно несколько, возможно схожих задач. Поддержка многих конкурентных процессов работающих в изоляции была одним из основных аспектов при разработке виртуальной машины языка Erlang.


Постараемся проверить эту возможность использовать потенциал многоядерного процессора на простом примере. Подсчитаем сколько раз встечается слово «лошадь» в рассказах писателя О. Генри размещенных в текстовых файлах в одной директории. Технически, мы будем считать количество вхождения последавательности символов «лошадь», а не слова, и тоьлко в нижнем регистре.



Подcчет вхождений подстроки в файлах


Начнем с функции подсчета количества вхождений подстроки в содержимом текстового файла.


word_count = fn(file, word) ->
  {:ok, content} = File.read(file)
  length(String.split(content, word)) - 1
end

Читаем содержимое файла и возвращаем количество упоминаний слова. Обработка ошибок опущена, для простоты.


Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.


word_count = fn(file, word) ->
  :timer.sleep(1000)
  {:ok, content} = File.read(file)
  count = length(String.split(content, word)) - 1
  IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
  count
end

Теперь посчитаем количество подстроки в каждом файле и выведем сумму.


Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts

И заодно замерим время выолнения всей программы.


# sync_word_count.exs
start_time = :os.system_time(:milli_seconds)

word_count = fn(file, word) ->
  :timer.sleep(1000)
  {:ok, content} = File.read(file)
  count = length(String.split(content, word)) - 1
  IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
  count
end

Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> word_count.(file, "лошадь") end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts

end_time = :os.system_time(:milli_seconds)
IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"

Всего у меня 12 файлов и ждать пришлось около 12-ти секунд, секунда за секундой созерцая как на мониторе появляется результат подсчета для каждого файла.


iex sync_word_count.exs
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
32
Finished in 12.053 seconds
Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)

Асинхронное выполнение задач


Для подсчета количества вхождений подстроки асинхронно воспользуемся методом создания процесса spawn (порождать) и методами send and receive для отправки и получения сообщения, соответственно.


Для каждого файла будем создавать отдельный процесс


async_word_count = fn(file, word) ->
  caller = self
  spawn(fn ->
    send(caller, {:result, word_count.(file, word)})
  end)
end

self — это текущий процесс. Создаем переменную caller с тем же значением, что и self. Порожденный процесс вызывает функцию word_count/2 и посылает результат обратно родительскому процессу.


Чтобы получить значение обрано, в родительком процессе нужно использовать receive (столько же раз, сколько процессов будет создано). Созданим для этого метод get_result/0.


get_result = fn ->
  receive do
    {:result, result} -> result
  end
end

Обновим программу.


# async_word_count.exs
start_time = :os.system_time(:milli_seconds)

word_count = fn(file, word) ->
  :timer.sleep(1000)
  {:ok, content} = File.read(file)
  count = length(String.split(content, word)) - 1
  IO.puts "Found #{inspect count} occurrence(s) of the word in file #{inspect file}"
  count
end

async_word_count = fn(file, word) ->
  caller = self
  spawn(fn ->
    send(caller, {:result, word_count.(file, word)})
  end)
end

get_result = fn ->
  receive do
    {:result, result} -> result
  end
end

Path.wildcard("/data/OGENRI/*.txt")
|> Enum.map(fn(file) -> async_word_count.(file, "лошадь") end)
|> Enum.map(fn(_) -> get_result.() end)
|> Enum.reduce(fn(x, acc) -> acc + x end)
|> IO.puts

end_time = :os.system_time(:milli_seconds)
IO.puts "Finished in #{(end_time - start_time) / 1000} seconds"

Проверим.


iex async_word_count.exs
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Found 9 occurrence(s) of the word in file "/data/OGENRI/serdce.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/businessmen.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/four.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/choose.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/light.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/prevr.txt"
Found 1 occurrence(s) of the word in file "/data/OGENRI/r_linii.txt"
Found 10 occurrence(s) of the word in file "/data/OGENRI/r_sixes.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/stihi.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/voice.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/ways.txt"
Found 0 occurrence(s) of the word in file "/data/OGENRI/r_dl.txt"
32
Finished in 1.014 seconds
Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)

Вывод


Приключения и лошади раньше были неотделимы друг от друга, сейчас уже, возможно, это не совсем так.


Ссылки


http://elixir-lang.org/getting-started/processes.html
http://culttt.com/2016/07/27/understanding-concurrency-parallelism-elixir/
https://elixirschool.com/lessons/advanced/concurrency/
код и текстовые файлы (папка OGENRI) — https://github.com/shhavel/elixir_concurrent_word_count

Комментарии (3)

  • 25 сентября 2016 в 22:32

    0

    Спасибо за интересное исследование. Поэтому возник вопрос, т.к. неинтересные публикации вопросов не вызывают:
    Добавим в функцию задержку на 1 секунду, а также выведем результат подсчета в консоль, перед тем как его вернуть.
    Этот момент вызвал сомнение. Значительное время тест не работает, а спит. Т.о. тестируем сон, а не только работу.
    • 25 сентября 2016 в 22:53

      0

      Очевидно считать лошадей и спать секунду это модель тяжелого вычисления.
  • 25 сентября 2016 в 22:45 (комментарий был изменён)

    +1

    Тема действительно интересная, я сам недавно сталкивался с распараллеливанием вычислений в Elixir, поэтому сделал небольшой рефакторинг и добавил основную основную фичу, значительно облегчающую распараллеливание — Task.


    Читать по ссылке, а пока предлагаю взглянуть на код:


    Весь длинный код работающего скрипта
    defmodule Wordcount do
      @main_word "лошадь"
    
      defp count(text, word) do
        length(String.split(text, word)) - 1
      end
    
      defp do_proceed(file_path) do
        File.read!(file_path)
        |> count(@main_word)
        |> (fn (count) -> IO.puts "Found #{count} occurrence(s) of the word in file #{file_path}" end).()
      end
    
      def proceed(:async) do
        Path.wildcard("./OGENRI/*.txt")
        |> Enum.map(&Task.async(fn -> do_proceed(&1) end))
        |> Enum.map(&Task.await/1)
      end
    
      def proceed(:sync) do
        Path.wildcard("./OGENRI/*.txt")
        |> Enum.map(&do_proceed/1)
      end
    
      ### Example:
      #   benchmark
      #   benchmark(:sync)
      def benchmark(type \\ :async) do
        start_time = :os.system_time(:milli_seconds)
        proceed(type)
        end_time = :os.system_time(:milli_seconds)
        IO.puts "Finished in #{(end_time - start_time)} miliseconds"
      end
    end

    Вот конкретные различия между синхронной и асинхронной версией


    def proceed(:sync) do
      Path.wildcard("./OGENRI/*.txt")
      |> Enum.map(&do_proceed/1)
    end
    
    def proceed(:async) do
      Path.wildcard("./OGENRI/*.txt")
      |> Enum.map(&Task.async(fn -> do_proceed(&1) end))
      |> Enum.map(&Task.await/1)
    end

    Пишем синхронный код — потом вуаля! — и он щелчком пальцев превращается в асинхронный. Добро пожаловать в functional programming!)

© Habrahabr.ru