Изучаем английский с Scala на Future и Actor
Решил тут я подтянуть свой английский язык. В частности, захотелось значительно расширить словарный запас. Я знаю, что существует масса программ, которые в игровой форме помогают это сделать. Загвоздка в том, что я не люблю геймфикацию. Предпочитаю по старинке. Листочек бумаги где таблица со словами, транскрипцией и переводом. И учим его учим. И проверяем свои знания, например, закрывая столбец с переводом. В общем, как я учил это в университете.
Прослышал я про то, что существует 3000 наиболее часто используемых слов, подобранных на OxfordDictionary сайте. Вот тут этот список слов: www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B Ну, а перевод на русский я решил брать отсюда: www.translate.ru/dictionary/en-ru Одна только проблема, все находиться на этих сайтах ну совсем не в том формате, который можно распечатать и учить. В итоге родилась идея это все запрограммировать. Но сделать это не как последовательный алгоритм, а все распаралелить. Что бы выкачивание и парсинг всех слов занял не (3000 слов * 2 сайта) / 60 секунд = 100 минут. Это если давать по 1 секунде на выкачивание и распарсивание страницы для извлечения перевода и транскрипции (в реальности думаю это в 3 раза дольше, пока соединение откроем, пока закроем и тд и тп).
Задачу я разбил сразу на два крупных блока. Первый блок, это операции ввода/вывода блокирующие — выкачивание страницы с сайта. Второй блок, это операции вычислительные, не блокирующие, но нагружающие CPU: парсинг страницы для извлечения перевода и транскрипции и добавление в словарь результатов парсинга.
Блокирующие операции я решил делать в пуле нитей, используя Future от Scala. Вычислительные задачи, решил раскидать на 3 актера Akka. Применяя методику TDD, cначала я написал тест к своим кирпичикам будущего приложения.
class Test extends FlatSpec with Matchers {
"Table Of Content extractor" should "download and extract content from Oxford Site" in {
val content:List[String] = OxfordSite.getTableOfContent
content.size should be (10)
content.find(_ == "A-B") should be (Some("A-B"))
content.find(_ == "U-Z") should be (Some("U-Z"))
}
"Words list extractor" should "download words from page" in {
val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 1)
val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
wordsTry should be a 'success
val words = wordsTry.get
words.get.find(_ == "abandon") should be (Some("abandon"))
}
"Words list extractor" should "return None from empty page" in {
val future: Future[Try[Option[List[String]]]] = OxfordSite.getWordsFromPage("A-B", 999)
val wordsTry:Try[Option[List[String]]] = Await.result(future,60 seconds)
wordsTry should be a 'success
val words = wordsTry.get
words should be(None)
}
"Russian Translation" should "download translation and parse" in {
val page: Future[Try[String]] = LingvoSite.getPage("test")
val pageResultTry: Try[String]= Await.result(page,60 seconds)
pageResultTry should be a 'success
val pageResult = pageResultTry.get
pageResult.contains("тест") should be(true)
LingvoSite.parseTranslation(pageResult).get should be("тест")
}
"English Translation" should "download translation and parse" in {
val page: Future[Try[String]] = OxfordSite.getPage("test")
val pageResultTry: Try[String] = Await.result(page,60 seconds)
pageResultTry should be a 'success
val pageResult = pageResultTry.get
pageResult.contains("examination") should be(true)
OxfordSite.parseTranslation(pageResult).get should be(("test", "an examination of somebody’s knowledge or ability, consisting of questions for them to answer or activities for them to perform"))
}
}
Обратите внимание. Функции, которые могут вернуть результат вычислений имеют Try[…]. Те либо Success результат или Failure и эксепшен. Функции, которые будут часто вызывать и имеют блокирующие i/o операции имеют результат, как Future[Try[…]]. Те при вызове функции сразу возвращается Future в которой идут долгие i/o операции. Притом они идут внутри Try и могут завершится с ошибок (например соединение порвалось).
Само приложение инициализируется в Top3000WordsApp.scala. Поднимается система актеров. Создаются актеры. Запускается парсинг списка слов, который в параллель запускает выкачивание английских и русских страниц с транскрипцией и переводом. В случае успешной скачки страниц, срабатывает передача содержимое страниц актерам для парсинга, извлекающих перевод и транскрипцию. Результат перевода актеры передают конечному актеру-словарю, который акамулирует все результаты в одном месте. И по нажатию enter, система актеров идет в shutdown. И актер DictionaryActor, получая сигнал об этом, сохраняет собраный словарь в файл dictionaty.txt
object Top3000WordsApp extends App {
val system = ActorSystem("Top3000Words")
val dictionatyActor = system.actorOf(Props[DictionaryActor], "dictionatyActor")
val englishTranslationActor = system.actorOf(Props(classOf[EnglishTranslationActor], dictionatyActor), "englishTranslationActor")
val russianTranslationActor = system.actorOf(Props(classOf[RussianTranslationActor], dictionatyActor), "russianTranslationActor")
val mapGetPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
val mapGetWordsThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
start()
scala.io.StdIn.readLine()
system.terminate()
def start() = {
import concurrent.ExecutionContext.Implicits.global
Future {
OxfordSite.getTableOfContent.par.foreach(letterGroup => {
getWords(letterGroup, 1)
})
}
}
def getWords(letterGroup: String, pageNum: Int): Unit = {
implicit val executor = mapGetWordsThreadExecutionContext
OxfordSite.getWordsFromPage(letterGroup, pageNum).map(tryWords => {
tryWords match {
case Success(Some(words)) => words.par.foreach(word => {
parse(word,letterGroup,pageNum)
})
case Success(None) => Unit
case Failure(ex) => println(ex.getMessage)
}
})
}
def parse(word: String, letterGroup: String, pageNum: Int)= {
implicit val executor = mapGetPageThreadExecutionContext
OxfordSite.getPage(word).map(tryEnglishPage => {
tryEnglishPage match {
case Success(englishPage) => {
englishTranslationActor ! (word, englishPage)
getWords(letterGroup, pageNum + 1)
}
case Failure(ex) => println(ex.getMessage)
}
})
LingvoSite.getPage(word).map(_ match {
case Success(russianPage) => {
russianTranslationActor !(word, russianPage)
}
case Failure(ex) => println(ex.getMessage)
})
}
}
Обратите внимание, что алгоритм разбит на start, getWords, parse функции. Это сделано из-за того, что для каждой фазы задачи требуется свой пул нитей, который передается неявно, как ThreadExecutionContext. Сначала, у меня была всего одна функция getWords, для рекурсивного вызова. Но все работало очень медленно, так как на верхнем уровне алгоритма распаралеливание выжирало весь пул нитей и в самом низу были вечные ожидания, когда же мне дадут свободную нить, что бы поработать. А как раз в низу самое большое число операций.
Вот реализация скачивания и парсинга с сайтов.
object OxfordSite {
val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
def parseTranslation(content: String): Try[(String, String)] = {
Try {
val browser = new Browser
val doc = browser.parseString(content)
val spanElement: Element = doc >> element(".phon")
val str = Jsoup.parse(spanElement.toString).text()
val transcription = str.stripPrefix("BrE//").stripSuffix("//").trim
val translation = doc >> text(".def")
(transcription,translation)
}
}
def getPage(word: String): Future[Try[String]] = {
implicit val executor = getPageThreadExecutionContext
Future {
Try {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/definition/english/" + (word.replace(' ','-')) + "_1")
html.mkString
}
}
}
def getWordsFromPage(letterGroup: String, pageNum: Int): Future[Try[Option[List[String]]]] = {
import ExecutionContext.Implicits.global
Future {
Try {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com" +
"/wordlist/english/oxford3000/Oxford3000_" + letterGroup + "/?page=" + pageNum)
val page = html.mkString
val browser = new Browser
val doc = browser.parseString(page)
val ulElement: Element = doc >> element(".wordlist-oxford3000")
val liElements: List[Element] = ulElement >> elementList("li")
if (liElements.size > 0) Some(liElements.map(_ >> text("a")))
else None
}
}
}
def getTableOfContent: List[String] = {
val html = Source.fromURL("http://www.oxfordlearnersdictionaries.com/wordlist/english/oxford3000/Oxford3000_A-B/")
val page = html.mkString
val browser = new Browser
val doc = browser.parseString(page)
val ulElement: Element = doc >> element(".hide_phone")
val liElements: List[Element] = ulElement >> elementList("li")
List(liElements.head >> text("span")) ++ liElements.tail.map(_ >> text("a"))
}
}
object LingvoSite {
val getPageThreadExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(16))
def parseTranslation(content: String): Try[String] = {
Try {
val browser = new Browser
val doc = browser.parseString(content)
val spanElement: Element = doc >> element(".r_rs")
spanElement >> text("a")
}
}
def getPage(word: String): Future[Try[String]] = {
implicit val executor = getPageThreadExecutionContext
Future {
Try {
val html = Source.fromURL("http://www.translate.ru/dictionary/en-ru/" + java.net.URLEncoder.encode(word,"UTF-8"))
html.mkString
}
}
}
}
Структуры данных с которыми работают актеры.
case class Word (word: String, transcription: Option[String] = None, russianTranslation:Option[String] = None, englishTranslation: Option[String] = None)
case class RussianTranslation(word:String, translation: String)
case class EnglishTranslation(word:String, translation: String)
case class Transcription(word:String, transcription: String)
Актеры, которые принимают на входе скачанные страницы для парсинга и пересылают перевод и транскрипцию актеру DictionaryActor
class EnglishTranslationActor (dictionaryActor: ActorRef) extends Actor {
println("EnglishTranslationActor")
def receive = {
case (word: String, englishPage: String) => {
OxfordSite.parseTranslation(englishPage) match {
case Success((transcription, translation)) => {
dictionaryActor ! EnglishTranslation(word,translation)
dictionaryActor ! Transcription(word,transcription)
}
case Failure(ex) => {
println(ex.getMessage)
}
}
}
}
}
class RussianTranslationActor (dictionaryActor: ActorRef) extends Actor {
println("RussianTranslationActor")
def receive = {
case (word: String, russianPage: String) => {
LingvoSite.parseTranslation(russianPage) match {
case Success(translation) => {
dictionaryActor ! RussianTranslation(word, translation)
}
case Failure(ex) => {
println(ex.getMessage)
}
}
}
}
}
Актер который накапливает в себе словарь с переводами и транскрипцией и после shutdown системы актеров записывает весь словарь в dictionary.txt
class DictionaryActor extends Actor {
println("DictionaryActor")
override def postStop(): Unit = {
println("DictionaryActor postStop")
val fileText = DictionaryActor.words.map{case (_, someWord)=> {
val transcription = someWord.transcription.getOrElse(" ")
val russianTranslation = someWord.russianTranslation.getOrElse(" ")
val englishTranslation = someWord.englishTranslation.getOrElse(" ")
List(someWord.word, transcription , russianTranslation , englishTranslation).mkString("|")
}}.mkString("\n")
scala.tools.nsc.io.File("dictionary.txt").writeAll(fileText)
println("dictionary.txt saved")
System.exit(0)
}
def receive = {
case Transcription(wordName, transcription) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(transcription = Some(transcription))
case None => Word(wordName,transcription = Some(transcription))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
case RussianTranslation(wordName, translation) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(russianTranslation = Some(translation))
case None => Word(wordName,russianTranslation = Some(translation))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
case EnglishTranslation(wordName, translation) => {
val newElement = DictionaryActor.words.get(wordName) match {
case Some(word) => word.copy(englishTranslation = Some(translation))
case None => Word(wordName,englishTranslation = Some(translation))
}
DictionaryActor.words += wordName -> newElement
println(newElement)
}
}
}
object DictionaryActor {
var words = scala.collection.mutable.Map[String, Word]()
}
Какие выводы? На моем Mac Book Pro этот скрипт работал в течение примерно 1 часа, пока я писал эту статью. Я его прервал нажав enter и вот какой результат:
bash-3.2$ cat ./dictionary.txt |wc -l
1809
Потом, я еще раз запустил скрипт и оставил на несколько часов. Когда вернулся, то у меня был процессор загружен 100% и были ошибки в консоле про гарбаж коллектор, по нажатию на enter моя программа не смогла сохранить результат своей работы в файл. Диагноз такой, писать на Future и par.map или par.foreach, конечно красиво и удобно, но реально очень тяжело понять как на самом деле на уровне нитей это все работает и где же узкое горлышко бутылки. В итоге я планирую все переписать на актеры. Притом, буду использовать пулы актеров. Что бы, например, 4 актера выкачивало и парсило страницы со списками слов, 18 актеров выкачивало страницы с переводами, 4 актера парсило страницы извлекая переводы и транскрипцию, ну и 1 актер складывал все в словарь.
Текущая реализация в бранче v0.1 github.com/evgenyigumnov/top3000words/tree/v0.1 Версия где все переписано на актеры с пулами будет в бранче v0.2, ну и в master, чуть позже. Может есть у кого соображения, что я делал не так, в текущей версии? Ну и может советы подкините по новой версии?
Проект на гитхабе доступен: github.com/evgenyigumnov/top3000words
Запустить тесты проекта: sbt test
Запустить приложение: sbt run
Ну и как надоест ждать, нажать enter и ознакомиться с содержимым файла dectionary.txt в текущей папке