[recovery mode] Еще раз о многопоточности в одну строку
Давеча понадобилось мне в моем проекте на Flask ускорить ответ сервера. Из-за того, что во view последовательно вызывается запрос к трём удаленным веб-сервисам, время загрузки страницы с данными не из кеша доходило до 10 сек. Да, возможно, Flask не тот фреймворк, который стоило использовать, но что имеем, то имеем.Итак, приступим. Поскольку реальный код я публиковать не могу, рассмотрю на академических примерах.Задача 1 Имеются три функции a, b, c, которые необходимо вызвать в отдельных потоках, дождаться результата их выполнения и выдать ответ.Для решения задачи 1 я воспользовался этим переводом, ибо был очарован простотой использования библиотеки. import multiprocessing.dummy as multiprocessing import time
def a (): time.sleep (2) return 'a' def b (): time.sleep (2) return 'b' def c (): time.sleep (1) return 'c'
p = multiprocessing.Pool ()
results = p.map (lambda f: f (),[a, b, c]) print (results) p.close () p.join ()
Результат выполнения кода:
['a', 'b', 'c'] Замечательно, но есть существенный минус. Время выполнения кода не ограничено, он будет ждать результата выполнения всех процедур. Изменяем формулировку задачи.Задача 2 Имеются три функции a, b, c, которые необходимо вызвать в отдельных потоках, и спустя интервал времени проверить, завершились они или нет, выдать результат.
Для решения используем ту же библиотеку, но уже функцию map_async. Ее отличие в том, что она возвращает объект AsyncResult.
import multiprocessing.dummy as multiprocessing import time
def a (): time.sleep (2) return 'a'
def b (): time.sleep (2) return 'b'
def c (): time.sleep (1) return 'c'
p = multiprocessing.Pool ()
result = p.map_async (lambda f: f (),[a, b, c])
TIMEOUT =3 print (results.get (TIMEOUT))
p.close () p.join ()
Результат выполнения при TIMEOUT>=3 такой же, как и в предыдущем случае, но если хоть одна из процедур не успевает завершится, выдается исключение TimeoutError. Однако и этот результат меня устроил не вполне. Дело в том, что в моем случае мне существенно было, чтобы успевала отработать одна функция, остальные могли и отсутствовать при выдаче.
Задача 3 Имеются три функции a, b, c, которые необходимо вызвать в отдельных потоках, дождаться результата функции a.
import multiprocessing.dummy as multiprocessing import time
def a (): time.sleep (2) print (1) return 'a'
def b (): time.sleep (3) print (2) return 'b'
def c (): time.sleep (1) print (3) return 'c'
p = multiprocessing.Pool ()
results=[] for r in p.imap (lambda f: f (),[a, b, c]): results.append® break
print (results) p.close () p.join ()
Результат выполнения:
3 1 ['a'] 2
Как видно, хотя отработали 2 функции из 3, результат мы получили только для приоритетной. Чтобы получить результат второй, следует использовать imap_unordered: results=[] for r in p.imap_unordered (lambda f: f (),[a, b, c]): results.append® if r =='a': break Результат: 3 1 ['c', 'a'] 2 Что, если нам в основном потоке нужен результат только одного потока, наиболее быстрого? Достаточно убрать вызов p.join () из предыдущего примера и выйти из цикла по первому результату.
Теперь еще такой момент. При попытке использовать модуль multiprocessing, который работает с процессами, вместо multiprocessing.dummy, работающего с тредами будет выдана ошибка сериализации cPickle.PicklingError, поскольку при межпроцессном взаимодействии не удается сериализовать функцию. Для того, чтобы код работал, нужно ввести функцию-псевдоним, код будет не настолько красив, но:
import multiprocessing import time
def a (): time.sleep (2) return 'a' def b (): time.sleep (2) return 'b' def c (): time.sleep (1) return 'c'
def func (param): if param == 'a': return a () elif param == 'b': return b () elif param == 'c': return c ()
p = multiprocessing.Pool ()
results = p.map (func,['a','b','c']) print (results) p.close () p.join ()