Анализируй это. Mista.ru

odxdkwpc_7anzdc0vazcz6golwq.png


What, How, Why


Форум Mista.ru — один из самых старых и активных форумов посвященный 1С. Первое сообщение датировано 2000 годом и на текущий момент счетчик тем перевалил за 800000, а количество сообщений больше 16 000 000. Форум был настолько популярен, что его даже пытались «зеркалировать», так как содержал неплохую базу вопросов-ответов по 1С, из-за чего админы форума добавили «защиту от скачивания». В этой статье будет описано то, как можно скачать этот (а в наверно и любой другой) форум в относительно короткие сроки при помощи Google Cloud Platform.


Intro


После моего предыдущего мини-проекта по получению интересного (на мой взгляд) дата-сета, мне нужна была очередная интересная задача, при решении которой я бы мог потренировать свои скиллы как data engineer. В качестве цели выбрал форум адинеснегов Mista.ru и сделал это сразу по нескольким причинам. Во-первых, это довольно старый форум и за многие годы он накопил миллионы сообщений на разные тематики. Во-вторых, он один из самых популярных среди программистов 1С (по-крайней мере был лет 6–7 назад) и активность на нем довольно высокая. В-третьих, 99.99% пользователей форума — сторонники нынешнего президента, а концентрация «упоротых ура-патриотов» просто зашкаливает, а это в свою очередь дает надежду получить очень интересную аналитику, например частоту употребления знаменитого слова »###», «Путин» или «Путин ###». Ну и в-четвертых, благодаря «защите», задача скачать форум становилась чуть более интереснее.


Grabbing script


Первые попытки подойти к задаче с уже имеющимися инструментами, которые остались у меня от предыдущего проекта, провалились. После 20 запросов GET запросов форум переставал отвечать. В веб-бекэнде не силен, но подозреваю, что частые запросы с одного ИП отслеживались и на все, что было не похоже на запросы от обычного пользователя, ставился бан. Куча перебранных скачивалок и грабберов сайтов натыкались на те же грабли и шли в корзину. Нужна была свежая идея.


И она нашлась. В частично пройденном курсе по Ruby-on-Rails услышал, что есть инструменты для автоматического тестирования, которые позволяют закодировать поведение пользователя и использовать эти скрипты-эмуляторы для в тест-кейсах. Вспомнив слово Selenium и воспользовавшись Гуглом, довольно быстро нашел небольшие примеры, с единственным нюансом — они были на питоне, а в моем арсенале был только 1С и R. Однако, с учетом того, что питон считается одним из легких в освоении, через пару часов я уже мог залогиниться на форуме:


import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait

driver = webdriver.Chrome("/usr/local/bin/chromedriver")
base_url = 'http://www.forum.mista.ru/'

driver.get(base_url)
username = "Добрый хачик"
password = "11"
uname = driver.find_element_by_name("user_name")
uname.send_keys(username.decode('utf-8'))
passw = driver.find_element_by_name("user_password")
passw.send_keys(password)
submit_button = driver.find_element_by_class_name("sendbutton").click()


А еще спустя (довольно продолжительное должен сказать) время появился такой вот класс MistaDownloader, который позволял сразу после инициализации залогиниться на форуме и скачивать ветки форума по ее номеру:


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

import base64
import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from bs4 import BeautifulSoup
from urllib import quote
import sys
import codecs
import binascii
import os
import datetime
import subprocess
import syslog

reload(sys)
sys.setdefaultencoding("utf-8")

syslog.syslog("MistaDownloader class loaded.")
print ("MistaDownloader class loaded.")

class MistaDownloader:

  def print_to_log(self, message):
    syslog.syslog(message)
    print (message)

  def __init__(self):
    self.print_to_log("MistaDownloader initializing...")
    self.driver = webdriver.Chrome("/usr/local/bin/chromedriver")
    self.base_url = 'http://www.forum.mista.ru/'
    self.folder = '//home/gomista/files'
    if not os.path.exists(self.folder):
      os.makedirs(self.folder)
    self.print_to_log("MistaDownloader initialized.")

  def authenticate(self):
    self.driver.get(self.base_url)
    username = "Добрый хачик"
    password = "11"
    uname = self.driver.find_element_by_name("user_name")
    uname.send_keys(username.decode('utf-8'))
    passw = self.driver.find_element_by_name("user_password")
    passw.send_keys(password)
    submit_button = self.driver.find_element_by_class_name("sendbutton")
                               .click()
    self.print_to_log("Authentication done.")

  def download_by_id(self, topic_id):

    self.print_to_log("ID: " + topic_id)
    def write_source_to_file(topic_id, page_number, page_url, page_source, 
                             folder):
      filename = folder + '/' + '{0:0>7}'.format(topic_id) + '_' 
                        + '{0:0>2}'.format(page_number) + '_' 
                        + binascii.hexlify(page_url) + '.txt'
      file = open(filename,'w')
      page_source_to_save = page_source.replace('\t', ' ')
                                       .replace('\n', ' ')
                                       .replace('\r', ' ')
      res = '%s\t%s\t%s\t%s' % (topic_id, page_number, page_url, 
                                page_source_to_save)
      file.write(res)
      file.close()

    page_number = 1
    current_url = '%s%s%s%s%s' % (self.base_url, 'topic.php?id=', topic_id,
                                  '&page=', page_number)
    self.print_to_log('getting page: ' + current_url)
    self.driver.set_page_load_timeout(240)
    try:
      self.driver.get(current_url)
      self.print_to_log('done')
      html = self.driver.page_source
      write_source_to_file(topic_id, page_number, current_url, html, 
                           self.folder)
      soup = BeautifulSoup(html, "lxml")
      pages_tag = soup.find('span', { 'class' : 'pages' })
      additional_pages = set()
      if pages_tag:
        pages_tag = pages_tag.findAll('a', attrs = {'data-page' : True})
        if pages_tag:
          for page_tag in pages_tag:
            additional_pages.add(page_tag['data-page'])
      additional_pages = list(sorted(additional_pages))
      for additional_page in additional_pages:
        current_url = '%s%s%s%s%s' % (self.base_url, 'topic.php?id=', 
                                      topic_id, '&page=', additional_page)
        self.print_to_log('getting page: '+current_url)
        self.driver.set_page_load_timeout(240)
        self.driver.get(current_url)
        self.print_to_log('done')
        html = self.driver.page_source
        write_source_to_file(topic_id, additional_page, current_url, html, 
                             self.folder)
    except Exception as e:
      write_source_to_file(topic_id, page_number, current_url, 
                 'ERRORERRORERROR', self.folder)


Само скачивание страниц можно было выполнять следующими строчками:


do = MistaDownloader()
do.authenticate()
topic_id = 1
do.download_by_id(topic_id)
...
topic_id = 99
do.download_by_id(topic_id)


Дальше я захотел, чтобы такой скрипт запускался не только на десктопе, но и на сервере без графического интерфейса. Для этого, при помощи гугла разродился следующей версией скрипта, который при помощи эмулятора мог запускаться где угодно:


from xvfbwrapper import Xvfb
from mista_downloader import MistaDownloader 

vdisplay = Xvfb()
vdisplay.start()
do = MistaDownloader()
...
vdisplay.stop()


И вот, казалось, достаточно запустить цикл от 1 до 1000000 (все новые ветки сейчас создаются с индексом чуть больше 811 тысяч). Однако… Предварительные замеры подсказывали, что курить придется примерно 2–3 недели, пока весь форум будет в виде файлов на жестком диске. Да, можно было пойти по проверенному пути и запустить в несколько потоков на собственном ноуте, но недавно просмотренные курсы Google Cloud Platform for Data Engineers подсказали новую идею.


Parallel Do


Google Cloud Platform, позволяет создать виртуальный сервер меньше чем за минуту и удалить его сразу же, после того, как он справится с поставленной задачей. Т.е. в соответствии с идеей мне не нужно было искать один мощный сервер и заставлять его работать неделю, мне проще было было арендовать 20 простых серверов например на 10 часов. А так как я это же время смотрел курсы по Hadoop, HDFS, MapReduce и прочему, я решил сделать собственный кластер с блекджеком и нодами.


Суть кластера — центральный сервер (мастер) содержит список ссылок (а именно ID тем форума) которые нужно скачать. Рабочие серверы (Ноды), подключаются к Мастеру, получают порцию IDs, скачивают их, складывают полученные файлы в хранилище GCS (Google Cloud Storage), уведомляют Мастера о том, что задание выполнено и получают новую порцию IDs.


В Google Cloud Platform есть довольно удобная вещь — Google Cloud Shell. Это небольшая бесплатная виртуальная машина, которая всегда доступна через браузер и с ее помощью можно управлять всем облаком. Практически для каждого действия через веб-интерфейс есть аналогичная команда, которую можно выполнить при помощи Google Cloud Shell. Все скрипты дальше в статье будут запускаться через нее.

При помощи следующего скрипта я создал виртуальную машину именованную cserver:


#! /bin/bash

gcloud compute --project "new-mista-project" instances create cserver \
  --zone "europe-west1-b" \
  --machine-type "n1-standard-2" \
  --subnet "default" \
  --metadata-from-file startup-script=startupscript_server.sh \
  --metadata "ssh-keys=gomista:ssh-rsa key-key-key gomista" \
  --maintenance-policy "MIGRATE" \
  --service-account "1007343160266-compute@developer.gserviceaccount.com" \
  --scopes "https://www.googleapis.com/auth/cloud-platform" \
  --min-cpu-platform "Automatic" \
  --image "image-mista-node" \
  --image-project "new-mista-project" \
  --boot-disk-size "10" \
  --boot-disk-type "pd-standard" \
  --boot-disk-device-name cserver

gcloud compute instances add-tags cserver --tags 'mysql-server' --zone \
  "europe-west1-b"


Это сервер типа n1-standard-2, что по факту означает 2CPUs 7.5Gb. В качестве жесткого диска — 10Gb Standard Disk. Кстати удовольствие стоит 48.95 per month estimated. Effective hourly rate $0.067 (730 hours per month), т.е. час пользования таким сервером вам обойдется в 4 рубля, а сутки чуть меньше 100р.

Сервер создавался на основе заранее подготовленного мной образа Ubuntu 16.04 LTS, с установленными библиотеками и приложениями. На самом деле сервер можно было создать и из голой Ubuntu, а команду на доустановку всех нужных программ передать в параметре --metadata-from-file, например типа sudo apt-get install xvfb x11-xkb-utils -y. Переданная команда или скрипт запустились бы сразу после создания ВМ и установили все необходимое. Но так как я знал что буду еще много раз создавать/удалять эту машину, решил потратить лишние 10 мин и подготовить образ (силами GCP). Также в момент создания сервера запускался мой скрит startupscript_server.sh, который копировал мое репо и добавлял в crontab скрипт, который раздавал задания Нодам.


Этот скрипт я использовал для создания Нодов:


#! /bin/bash

gcloud compute --project "new-mista-project" instances create $1 \
  --zone $2 \
  --machine-type "n1-standard-1" \
  --subnet "default" \
  --metadata-from-file startup-script=startupscript.sh \
  --metadata "ssh-keys=gomista:ssh-rsa key-keykey gomista" \
  --no-restart-on-failure \
  --maintenance-policy "TERMINATE" \
  --preemptible \
  --service-account "1007343160266-compute@developer.gserviceaccount.com" \
  --scopes "https://www.googleapis.com/auth/cloud-platform" \
  --min-cpu-platform "Automatic" \
  --image "image-mista-node" \
  --image-project "new-mista-project" \
  --boot-disk-size "10" \
  --boot-disk-type "pd-standard" \
  --boot-disk-device-name $1

gcloud compute instances add-tags $1 --tags 'mysql-client'  --zone $2


В качестве Нода был сервер n1-standard-1 (1CPUs 3.75Gb). Главным отличием Нода от Мастера можно выделить параметр --preemptible. Создание сервера с таким флагом означает, что данный сервер может быть в любое время принудительно отключен Гуглом и что максимальное время работы сервера является 24 часа. Если арендовать такой сервер и согласиться на подобные условия, то Гугл скинет цену на него с 24.67 per month до 7.70 per month (hourly $0.011), что чуть больше 15 рублей в сутки. А так как мне как раз нужно было пару десятков машин «for batch jobs and fault-tolerant workloads», то preemptible машины стали неплохим вариантом. Кстати, добавляя серверам теги, я таким образом добавлял им Firewall Rules (которые заранее привязал к этим тегам).

Создать 20 виртуальных машин одной командой и получить их готовыми к работе уже через 1 минуту? Пожалуйста:


#! /bin/bash

./create_node_west.sh node01 europe-west1-b &
./create_node_west.sh node02 europe-west1-b &
./create_node_west.sh node03 europe-west1-b &
./create_node_west.sh node04 europe-west1-b &
./create_node_west.sh node05 europe-west1-b &
./create_node_west.sh node06 europe-west1-b &
./create_node_west.sh node07 europe-west2-b &
./create_node_west.sh node08 europe-west2-b &
./create_node_west.sh node09 europe-west2-b &
./create_node_west.sh node10 europe-west2-b &
./create_node_west.sh node11 europe-west2-b &
./create_node_west.sh node12 europe-west2-b &
./create_node_west.sh node13 europe-west2-b &
./create_node_west.sh node14 europe-west3-b &
./create_node_west.sh node15 europe-west3-b &
./create_node_west.sh node16 europe-west3-b &
./create_node_west.sh node17 europe-west3-b &
./create_node_west.sh node18 europe-west3-b &
./create_node_west.sh node19 europe-west3-b &
./create_node_west.sh node20 europe-west3-b &


Blackjack and nodes


А теперь как это все работало.


После старта Мастера, я запускал скрипт, который устанавливал MySQL и создавал необходимые таблицы (кстати опыт работы мускулом тоже первый):


#! /bin/bash

sudo apt-get install mysql-server -y
sudo sed -i -e "s/bind-address/#bind-address/g" \
  /etc/mysql/mysql.conf.d/mysqld.cnf
sudo service mysql restart

echo "Please enter root user MySQL password!"
read rootpasswd
mysql -uroot -p${rootpasswd} \
  -e "CREATE DATABASE mistadb DEFAULT CHARACTER SET utf8;"
mysql -uroot -p${rootpasswd} \
  -e "CREATE USER 'gomista'@'%' IDENTIFIED BY 'gomista';"
mysql -uroot -p${rootpasswd} \
  -e "GRANT ALL PRIVILEGES ON *.* TO 'gomista'@'%' WITH GRANT OPTION;"
mysql -uroot -p${rootpasswd} -e "FLUSH PRIVILEGES;"

mysql -ugomista -pgomista -e "CREATE TABLE server_statuses (
node_name VARCHAR(50) PRIMARY KEY,
node_ip VARCHAR(15) NOT NULL,
node_status TINYINT(50),
updated_at TIMESTAMP
);" mistadb

mysql -ugomista -pgomista -e "CREATE TABLE server_commands (
id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
node_name VARCHAR(50) NOT NULL,
node_ip VARCHAR(15) NOT NULL,
job_id VARCHAR(70),
n_links INT(50),
received TINYINT(50),
updated_at TIMESTAMP,
CONSTRAINT node_name_unique UNIQUE (node_name, job_id)
);" mistadb

mysql -ugomista -pgomista -e "CREATE TABLE links (
job_id VARCHAR(70),
link VARCHAR(200),
status TINYINT(50),
updated_at TIMESTAMP,
INDEX(job_id),
INDEX(link),
CONSTRAINT node_name_unique UNIQUE (job_id, link)
);" mistadb

python -c "exec(\"import sys\\nfor i in range(1,1000000): \
  print ('\t'+'{0:0>6}'.format(str(i))+'\t0')\")" > numbers
mysql -ugomista -pgomista -e "LOAD DATA LOCAL INFILE 'numbers' \
  INTO TABLE links;" mistadb
rm numbers


В таблице links хранились номера от 1 до 999999, информация о том, обработан номер или нет (статус 0 — не обработано, 1 — в работе, 2 — обработано), идентификатор джоба, который выполняет или выполнил обработку того или иного номера. В таблице server_commands — джобы, которые Мастер раздавал Нодам (received 0 — выдан, received 1 — получен, received 2 — завершен). В таблице server_statuses — статусы серверов (node_status 0 — свободен, node_status 1 — занят).

Каждые 5 секунд на Мастере запускался следующий скрипт:


#!/usr/bin/env python

import mysql.connector
import socket
import uuid
import datetime
import sys

if len(sys.argv) == 2:
  number_of_links = int(sys.argv[1])
else:
  number_of_links = 500

cnx = mysql.connector.connect(user='gomista', password='gomista', 
                              host='cserver', database='mistadb')
cursor = cnx.cursor()

select_servers =("SELECT server_statuses.node_name, server_statuses.node_status," 
        "server_commands.received "
        "FROM server_statuses "
        "LEFT JOIN server_commands "
        "ON server_statuses.node_name = server_commands.node_name and "
        "server_commands.received <> 2 "
        "WHERE server_statuses.node_status = %(node_status)s and "
        "server_commands.received IS NULL LIMIT 1")

server_status = {'node_status': 0}
cursor.execute(select_servers, server_status)
row = cursor.fetchone()

if row:
  node_name = row[0]
  print (node_name)
  cursor = cnx.cursor()
  new_job_id = node_name + "_" 
                         + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 
                         + "_" + str(uuid.uuid4()) 
  received = 0
  add_job_command = ("INSERT INTO server_commands "
           "(node_name, node_ip, job_id, n_links, received) "
           "VALUES (%(node_name)s, %(node_ip)s, %(job_id)s, %(n_links)s, %(received)s) "
           "ON DUPLICATE KEY UPDATE node_ip = %(node_ip)s, "
           "n_links = %(n_links)s, received = %(received)s, updated_at=now();"
           "UPDATE links "
           "SET job_id = %(job_id)s "
           "WHERE job_id = '' AND status = 0 LIMIT %(n_links)s;")
  server_job = {'node_name': str(node_name), 'node_ip': str(node_name), 
                'job_id': new_job_id, 'n_links': number_of_links, 
                'received': received}
  for result in cursor.execute(add_job_command, server_job, multi=True):
    pass
  cnx.commit()
  cursor.close()
  cnx.close()
else:
  print ("no available worker found")


Этот скрипт проверял таблицу server_statuses на предмет наличия доступных серверов, и в случае обнаружения такового, создавал для него джоб в таблице server_commands и привязывал к джобу определенное количество номеров из таблицы links.

Ноды же в свою очередь, во время старта добавляли себе в crontab следующий скрипт, который выполнялся каждую минуту:


#!/usr/bin/env python

from xvfbwrapper import Xvfb
from cserver_connector import CserverConnector
from mista_downloader import MistaDownloader
import sys
import subprocess
import syslog
import datetime

MAX_PROCESS_TIME = 40 # min

def print_to_log(message):
  syslog.syslog(message)
  print (message)

def get_process_time_in_min(connector):
  mod_time = datetime.datetime.fromtimestamp(connector.modified_time)
  current_time = datetime.datetime.now()
  delta = current_time - mod_time
  delta_min = delta.seconds / 60
  return delta_min

print_to_log("Creating connector...")
connector = CserverConnector()

print_to_log("Connector created. Working: "+str(connector.working)) 
if connector.working:
  process_time_in_min = get_process_time_in_min(connector)
  if process_time_in_min > MAX_PROCESS_TIME:
    print_to_log("Canceling job...")
    connector.cancel_job()
    connector.remove_file_flag()
    print_to_log("Updating 0 status on cserver...")
    connector.update_status_on_cserver(0)
    print_to_log("Exiting...")
    exit(0)
  else:
    connector.update_status_on_cserver(1)
    print_to_log("Exiting...")
    exit(0)
else:
  print_to_log("Updating 0 status on cserver...")
  connector.update_status_on_cserver(0)
  print_to_log("Getting new job...")
  connector.get_new_job()
  if not connector.job_id:
    print_to_log("No assigned jobs found...")
    print_to_log("Exiting...")
    exit(0)
  print_to_log("Job found: "+connector.job_id)
  connector.create_file_flag()
  print_to_log("Updating 1 status on cserver...")
  connector.update_status_on_cserver(1)
  print_to_log("Getting list of links...")
  connector.get_links_list()

  print_to_log("Starting Xvfb...")
  vdisplay = Xvfb()
  vdisplay.start()

  print_to_log("Creating MistaDownloader...")
  try:
    do = MistaDownloader()
  except Exception as e:
    print_to_log(str(e))
    raise
  print_to_log("Do authenticate...")
  do.authenticate()
  folder = do.folder

  print_to_log("Downloading links in loop...")
  for link in connector.links_list:
    do.download_by_id(link[0].lstrip('0'))

  vdisplay.stop()

  print_to_log("Moving files to GS...")
  move_command = "gsutil -m mv " + folder + "/* gs://mistabucket-west/files"
  subprocess.call([move_command], shell=True)
  print_to_log("Updating links on finish...")
  connector.update_links_on_finish()
  connector.remove_file_flag()
  print_to_log("Updating 0 status on cserver...")
  connector.update_status_on_cserver(0)
  print_to_log("Exiting...")


Этот скрипт подключался к Мастеру и смотрел таблицу server_commands. Если находил задание, то по номеру джоба получал список номеров, начинал скачивание и уведомлял Мастер о том, что он взял джоб в работу и что теперь он занят. После завершения скачивания, Нод обновлял таблицы server_statuses (говорил что теперь свободен), server_commands (говорил что выполнил задание), links отмечал какие номера скачал. Мастер же в свою очередь, как только видел что Нод освободился, закидывал для него в задания новый джоб с еще необработанными номерами, чтобы Нод меньше чем через минуту подхватил это задание и принялся за работу…

Все общение Нода с Мастером вынес в отдельный класс CServerConnector:


#!/usr/bin/env python

import mysql.connector
import socket
import os
import syslog

class CserverConnector:

  def print_to_log(self, message):
    syslog.syslog(message)
    print (message)

  def __init__(self):
    self.hostname = socket.gethostname()
    self.file_flag = '/home/gomista/WORKING' 
    self.mysql_connector = mysql.connector.connect(user='gomista', 
                                                   password='gomista', 
                                                   host='cserver', 
                                                   database='mistadb', 
                                                   buffered=True)

    self.working = os.path.isfile(self.file_flag)

    if self.working:
      self.modified_time = os.path.getmtime(self.file_flag) 
      with open(self.file_flag, 'r') as f:
        self.job_id = f.readline().strip()
    else:
      self.modified_time = None
      self.job_id = None

    self.n_links = 0
    self.links_list = list()

  def create_file_flag(self):
    file_flag_to_write = open(self.file_flag, "w")
    file_flag_to_write.write(self.job_id)
    file_flag_to_write.close()

  def update_file_flag(self):
    self.create_file_flag()

  def remove_file_flag(self):
    if os.path.isfile(self.file_flag):
      os.remove(self.file_flag)

  def update_status_on_cserver(self, status):
    cursor = self.mysql_connector.cursor()
    add_server_status_query = ("INSERT INTO server_statuses "
              "(node_name, node_ip, node_status) "
              "VALUES (%(node_name)s, %(node_ip)s, %(node_status)s)"
              "ON DUPLICATE KEY UPDATE node_ip = %(node_ip)s, "
              "node_status = %(node_status)s, updated_at=now()")
    server_status = {'node_name': self.hostname, 'node_ip': self.hostname, 
                     'node_status': status}
    cursor.execute(add_server_status_query, server_status)
    self.mysql_connector.commit()
    cursor.close()

  def get_new_job(self):
    cursor = self.mysql_connector.cursor()
    get_command_query = ("SELECT node_name, job_id, n_links "
              "FROM server_commands "
              "WHERE node_name = %(node_name)s AND job_id <> '' AND "
              "received = %(node_name)s LIMIT 1")
    server_status = {'node_name': self.hostname, 'node_ip': self.hostname, 
                     'received': 0}
    cursor.execute(get_command_query, server_status)
    row = cursor.fetchone()
    if row:
      node_name = str(row[0])
      self.job_id = str(row[1])
      self.n_links  = str(row[2])
    else:
      self.job_id = None
      self.n_links  = 0
    cursor.close()

  def get_links_list(self):
    self.print_to_log("Getting coursor and prepare the query...")
    cursor = self.mysql_connector.cursor()
    update_job_command_query = ("UPDATE server_commands "
                   "SET received = 1 "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    self.print_to_log("Executing query 1.")
    cursor.execute(update_job_command_query, server_job)
    self.print_to_log("Commiting query 1.")
    self.mysql_connector.commit()
    update_links_query = ("UPDATE links "
                   "SET status = 1 "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    self.print_to_log("Executing query 2. with job_id: "+self.job_id)
    try:
      cursor.execute(update_links_query, server_job)
    except Exception as e:
      self.print_to_log(str(e))
      self.print_to_log(cursor.statement)
      raise
    self.print_to_log("Commiting query 2.")
    self.mysql_connector.commit()
    cursor.close()
    cursor = self.mysql_connector.cursor()
    get_links_query = ("SELECT link FROM links WHERE job_id = %(job_id)s")
    self.print_to_log("Executing query 3.")
    cursor.execute(get_links_query, {'job_id': self.job_id})
    for (link) in cursor:
      self.links_list.append(link)
    cursor.close()
    self.print_to_log("Finished.")

  def update_links_on_finish(self):
    cursor = self.mysql_connector.cursor()
    update_job_command_query = ("UPDATE server_commands "
                   "SET received = 2 "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    cursor.execute(update_job_command_query, server_job)
    self.mysql_connector.commit()
    update_links_query = ("UPDATE links "
                   "SET status = 2 "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    cursor.execute(update_links_query, server_job)
    self.mysql_connector.commit()
    cursor.close()

  def cancel_job(self):
    cursor = self.mysql_connector.cursor()
    update_job_command_query = ("DELETE FROM server_commands "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    cursor.execute(update_job_command_query, server_job)
    self.mysql_connector.commit()
    update_links_query = ("UPDATE links "
                   "SET status = 0, job_id = '' "
                   "WHERE job_id = %(job_id)s;")
    server_job = {'job_id': self.job_id}
    cursor.execute(update_links_query, server_job)
    self.mysql_connector.commit()
    cursor.close()


Вот так выглядят таблицы в процессе работы:


gomista@cserver:~/mista$ ./get_mysql_status.sh 
NODES:
+-----------+---------+-------------+---------------------+
| node_name | node_ip | node_status | updated_at          |
+-----------+---------+-------------+---------------------+
| node01    | node01  |           1 | 2018-01-05 08:07:02 |
| node02    | node02  |           1 | 2018-01-05 08:07:02 |
| node03    | node03  |           1 | 2018-01-05 08:07:02 |
| node04    | node04  |           1 | 2018-01-05 08:07:02 |
| node05    | node05  |           1 | 2018-01-05 08:07:02 |
| node06    | node06  |           1 | 2018-01-05 08:07:02 |
| node07    | node07  |           1 | 2018-01-05 08:07:03 |
| node08    | node08  |           1 | 2018-01-05 08:07:03 |
| node09    | node09  |           1 | 2018-01-05 08:07:03 |
| node10    | node10  |           1 | 2018-01-05 08:07:03 |
| node11    | node11  |           0 | 2018-01-05 08:07:17 |
| node12    | node12  |           1 | 2018-01-05 08:07:03 |
| node13    | node13  |           1 | 2018-01-05 08:07:03 |
| node14    | node14  |           1 | 2018-01-05 08:07:03 |
| node15    | node15  |           1 | 2018-01-05 08:07:03 |
| node16    | node16  |           1 | 2018-01-05 08:07:03 |
| node17    | node17  |           0 | 2018-01-05 08:07:15 |
| node18    | node18  |           1 | 2018-01-05 08:07:03 |
| node19    | node19  |           1 | 2018-01-05 08:07:03 |
| node20    | node20  |           1 | 2018-01-05 08:07:04 |
| node21    | node21  |           1 | 2018-01-05 08:07:03 |
| node22    | node22  |           1 | 2018-01-05 08:07:03 |
+-----------+---------+-------------+---------------------+
JOBS DONE:
+-----------+-----------+
| node_name | jobs_done |
+-----------+-----------+
| node08    |         5 |
| node10    |         5 |
| node11    |         6 |
| node07    |         6 |
| node09    |         7 |
| node22    |         8 |
| node13    |         9 |
| node06    |         9 |
| node20    |        10 |
| node17    |        10 |
| node14    |        10 |
| node21    |        10 |
| node12    |        10 |
| node18    |        11 |
| node15    |        11 |
| node02    |        12 |
| node19    |        12 |
| node04    |        12 |
| node01    |        12 |
| node05    |        13 |
| node03    |        13 |
| node16    |        15 |
+-----------+-----------+
COMMANDS TO BE DONE:
+-----+-----------+---------+-------+---------+----------+---------------------+
| id  | node_name | node_ip | job_id| n_links | received | updated_at          |
+-----+-----------+---------+-------+---------+----------+---------------------+
| 266 | node17    | node17  | ad062 |     500 |        0 | 2018-01-05 08:07:17 |
| 267 | node11    | node11  | ab531 |     500 |        0 | 2018-01-05 08:07:22 |
+-----+-----------+---------+-------+---------+----------+---------------------+
COMMANDS IN PROGRESS:
+-----+-----------+---------+-------+---------+----------+---------------------+
| id  | node_name | node_ip | job_id| n_links | received | updated_at          |
+-----+-----------+---------+-------+---------+----------+---------------------+
| 244 | node14    | node14  | 5e1b6 |     500 |        1 | 2018-01-05 07:55:04 |
| 245 | node06    | node06  | d0235 |     500 |        1 | 2018-01-05 07:56:02 |
| 246 | node13    | node13  | c82fd |     500 |        1 | 2018-01-05 07:56:04 |
| 249 | node09    | node09  | 1d553 |     500 |        1 | 2018-01-05 07:59:04 |
| 250 | node12    | node12  | 9176f |     500 |        1 | 2018-01-05 07:59:03 |
| 251 | node22    | node22  | 1c8ae |     500 |        1 | 2018-01-05 08:00:03 |
| 252 | node15    | node15  | 3ca50 |     500 |        1 | 2018-01-05 08:01:04 |
| 253 | node18    | node18  | 8836c |     500 |        1 | 2018-01-05 08:01:03 |
| 254 | node21    | node21  | 091f7 |     500 |        1 | 2018-01-05 08:01:03 |
| 255 | node16    | node16  | 6475d |     500 |        1 | 2018-01-05 08:02:04 |
| 256 | node19    | node19  | 489b3 |     500 |        1 | 2018-01-05 08:03:04 |
| 257 | node04    | node04  | 9fd5a |     500 |        1 | 2018-01-05 08:03:02 |
| 258 | node05    | node05  | 49b9f |     500 |        1 | 2018-01-05 08:03:02 |
| 259 | node01    | node01  | 579a7 |     500 |        1 | 2018-01-05 08:04:02 |
| 260 | node20    | node20  | d3fe8 |     500 |        1 | 2018-01-05 08:05:03 |
| 261 | node08    | node08  | ff1c7 |     500 |        1 | 2018-01-05 08:05:04 |
| 262 | node03    | node03  | b1165 |     500 |        1 | 2018-01-05 08:06:02 |
| 263 | node02    | node02  | 7d138 |     500 |        1 | 2018-01-05 08:06:02 |
| 264 | node10    | node10  | 4ad65 |     500 |        1 | 2018-01-05 08:06:04 |
| 265 | node07    | node07  | e6501 |     500 |        1 | 2018-01-05 08:06:04 |
+-----+-----------+---------+-------+---------+----------+---------------------+
LINKS TO BE PROCESSED:
+-------------+
| count(link) |
+-------------+
|      881999 |
+-------------+
LINKS IN PROCESS:
+-------------+
| count(link) |
+-------------+
|       10000 |
+-------------+
LINKS DONE
+-------------+
| count(link) |
+-------------+
|      108000 |
+-------------+
gomista@cserver:~/mista$ 


Весь процесс занял примерно 20 часов. Кому интересно, в деньгах это: $0.067 20 часов + $0.011 20 часов * 20 машин или 5.74 в валюте врага или около 350 в наших родных рублях. Затраченные 20 часов и 350 рублей принесли плоды в виде овер миллиона файлов весом 30Гб.


Список пользователей был скачан этим скриптом. По факту это просто перебор страниц пользователей от 1 до 200000. На самом деле пользователей меньше, однако по каким-то причинам были обнаружены пользователи с ID выше 100000, поэтому было решено скачать с запасом:


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

import base64
import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from bs4 import BeautifulSoup
from urllib import quote
from xvfbwrapper import Xvfb
import sys
import codecs
import binascii
import os
import datetime

print(sys.getdefaultencoding())
reload(sys)
sys.setdefaultencoding("utf-8")
print(sys.getdefaultencoding())

if len(sys.argv) == 3:
  N_START = int(sys.argv[1])
  N_END = int(sys.argv[2])
else:
  N_START = 1
  N_END = 2

URL_TO_SAVE = 'users'
if not os.path.exists(URL_TO_SAVE):
  os.makedirs(URL_TO_SAVE)

vdisplay = Xvfb()
vdisplay.start()

def _convert(param):
    if isinstance(param, str):
        return param.decode('utf-8')
    else:
        return param

def get_driver():
  url = "http://www.forum.mista.ru/index.php"
  driver = webdriver.Chrome()
  return driver

def authenticate(url, driver):
  driver.get(url)
  username = "Добрый хачик"
  password = "11"
  uname = driver.find_element_by_name("user_name")
  uname.send_keys(_convert(username))
  passw = driver.find_element_by_name("user_password")
  passw.send_keys(password)
  submit_button = driver.find_element_by_class_name("sendbutton").click()
  url_edit = "http://www.forum.mista.ru/users.php?action=edit"
  driver.get(url_edit)
  a = driver.find_element_by_xpath("//a[@href='#tab3']")
  a.click()
  topics = driver.find_element_by_name("topics_per_page")
  topics.clear()
  topics.send_keys(99)
  section = driver.find_element_by_name("column_forum")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_replies")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("show_topic_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_author")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_updated")
  if not section.is_selected():
    section.click()       
  submit_button = driver.find_element_by_name("Submit").click()

base_url = 'http://www.forum.mista.ru/'
print("getting driver")
driver = get_driver()
print("logging")
authenticate(base_url, driver)

def save_user_page(page, driver, n=0):
  links_list = list()
  print('getting page: '+page)
  driver.get(page)
  print('done')
  html = driver.page_source.replace('\t', ' ').replace('\n', ' ')
               .replace('\r', ' ')
  filename = URL_TO_SAVE + '/' + '{0:0>10}'.format(n) + '_' 
                         + binascii.hexlify(page) + '.txt'
  file = open(filename,'w')
  file.write(html + '\n')
  file.close()

limit = 100
for i in range(N_START, N_END):
  current_url = 'http://www.forum.mista.ru/users.php?id=' + str(i)
  n = i
  save_user_page(current_url, driver, n)

vdisplay.stop() 


Скачивание (и одновременный парсинг) списка тем и таблицы банов пользователей сделал этим скриптами.


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

import base64
import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from bs4 import BeautifulSoup
from urllib import quote
from xvfbwrapper import Xvfb
import sys
import codecs
import binascii
import os
import datetime

print(sys.getdefaultencoding())
reload(sys)
sys.setdefaultencoding("utf-8")
print(sys.getdefaultencoding())

URL_TO_SAVE = 'links_backward'
if not os.path.exists(URL_TO_SAVE):
  os.makedirs(URL_TO_SAVE)

vdisplay = Xvfb()
vdisplay.start()

def _convert(param):
    if isinstance(param, str):
        return param.decode('utf-8')
    else:
        return param

def get_driver():
  url = "http://www.forum.mista.ru/index.php"
  driver = webdriver.Chrome()
  return driver

def authenticate(url, driver):
  driver.get(url)
  username = "Добрый хачик"
  password = "11"
  uname = driver.find_element_by_name("user_name")
  uname.send_keys(_convert(username))
  passw = driver.find_element_by_name("user_password")
  passw.send_keys(password)
  submit_button = driver.find_element_by_class_name("sendbutton").click()
  url_edit = "http://www.forum.mista.ru/users.php?action=edit"
  driver.get(url_edit)
  a = driver.find_element_by_xpath("//a[@href='#tab3']")
  a.click()
  topics = driver.find_element_by_name("topics_per_page")
  topics.clear()
  topics.send_keys(99)
  section = driver.find_element_by_name("column_forum")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_replies")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("show_topic_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_author")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_updated")
  if not section.is_selected():
    section.click()       
  submit_button = driver.find_element_by_name("Submit").click()

base_url = 'http://www.forum.mista.ru/'
print("getting driver")
driver = get_driver()
print("logging")
authenticate(base_url, driver)

def save_links_list_on_page(page, driver, n=0):
  links_list = list()
  print('getting page: '+page)
  driver.get(page)
  print('done')
  html = driver.page_source
  soup = BeautifulSoup(html, "lxml")
  tr_list = soup.findAll('tr', attrs = {'data-topic_id' : True})
  for tr_element in tr_list:
    topic_id = tr_element['data-topic_id']
    tr_element_1 = tr_element.findAll('td', { 'class' : 'cc' })
    section = tr_element_1[0].getText().replace('\t', '')
    length = tr_element_1[1].getText().replace('\t', '')
    tr_element_2 = tr_element.find('a', { 'class' : 'agb' })
    title = tr_element_2.getText().replace('\t', '')
    tr_element_2_1 = tr_element.find('a', { 'class' : 'userlink' })
    user_link = tr_element_2_1['href']
    user_name = tr_element_2_1.getText().replace('\t', '')
    tr_element_2_2 = tr_element.find('a', { 'class' : 'sectionlink' })
    subsection = ''
    if tr_element_2_2:
      subsection = tr_element_2_2.getText().replace('\t', '')
    classes = ' '.join(tr_element_2['class']).replace('\t', '')
    link = tr_element_2['href'].replace('\t', '')
    tr_element_3 = tr_element.find('a', { 'class' : 'sectionlink-gray' })
    link_attributes = '%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s' 
                      % (topic_id, link, section, subsection, length, title, 
                        user_link, user_name, classes)  
    links_list.append(link_attributes)
  next_link_tag = soup.find('a', text='<<')
  if next_link_tag:
    nex_page_link = base_url+next_link_tag['href']
  else:
    nex_page_link = ''
  filename = URL_TO_SAVE + '/' + '{0:0>10}'.format(n) + '_' 
                         + binascii.hexlify(page) + '.txt'
  file = open(filename,'w')
  for link in links_list:
    file.write(link + '\n')
  file.close()
  return nex_page_link

current_url = 'http://www.forum.mista.ru/index.php?id=30309&after=2000/07/06_14:17:00'
n = 1
while current_url != '':
  current_url = save_links_list_on_page(current_url, driver, n)
  n += 1
  print('next page to process: '+current_url)

vdisplay.stop()


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

import base64
import selenium
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from bs4 import BeautifulSoup
from urllib import quote
from xvfbwrapper import Xvfb
import sys
import codecs
import binascii
import os
import datetime
import re
from dateutil import parser
from pytz import timezone

reload(sys)
sys.setdefaultencoding("utf-8")

URL_TO_SAVE = 'users_bans'
if not os.path.exists(URL_TO_SAVE):
  os.makedirs(URL_TO_SAVE)

vdisplay = Xvfb()
vdisplay.start()

def get_empty_ban():
  empty_ban = {}
  empty_ban['ban_user_id'] = ''
  empty_ban['ban_type'] = ''
  empty_ban['ban_user_ip'] = ''
  empty_ban['ban_section'] = ''
  empty_ban['ban_subsection'] = ''
  empty_ban['ban_date'] = ''
  empty_ban['ban_end_date'] = ''
  empty_ban['ban_moderator_id'] = ''
  empty_ban['ban_reason'] = ''
  empty_ban['ban_topic_id'] = ''
  return empty_ban

def get_list_of_users_on_page(source):

  tz = timezone('Europe/Moscow')
  fmt = '%Y-%m-%d %H:%M:%S %Z%z'

  def parse_message_time(str_message_time):
    message_time = parser.parse(str_message_time)
    message_time = tz.localize(message_time)
    return message_time.strftime(fmt)

  soup = BeautifulSoup(source, "html.parser")
  table = soup.find('table', { 'bgcolor' : '#CCCCCC' })
  tr_elements = table.findAll('tr', { 'class' : 'active' })
  ban_list = list()
  for user_element in tr_elements:
    ban = get_empty_ban()
    tds = user_element.findAll('td')
    ban['ban_user_id'] = tds[0].find('a')['data-user_id']
    ban['ban_type'] = tds[1].getText()
    ban['ban_user_ip'] = tds[2].getText()
    ban['ban_section'] = tds[3].getText()
    ban['ban_subsection'] = tds[4].getText()
    ban['ban_date'] = parse_message_time(tds[8].getText())
    ban['ban_end_date'] = parse_message_time(tds[5].getText())
    ban['ban_moderator_id'] = tds[7].find('a')['href'].split('=')[1]
    ban['ban_reason'] = tds[9].getText()
    ban['ban_topic_id'] = tds[10].getText()
    print ('%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s' % 
      ( ban['ban_user_id'].encode('utf-8')
      , ban['ban_type'].encode('utf-8')
      , ban['ban_user_ip'].encode('utf-8')
      , ban['ban_section'].encode('utf-8')
      , ban['ban_subsection'].encode('utf-8')
      , ban['ban_date'].encode('utf-8')
      , ban['ban_end_date'].encode('utf-8')
      , ban['ban_moderator_id'].encode('utf-8')
      , ban['ban_reason'].encode('utf-8')
      , ban['ban_topic_id']))
    ban_list.append(ban)

  return ban_list

def _convert(param):
    if isinstance(param, str):
        return param.decode('utf-8')
    else:
        return param

def get_driver():
  url = "http://www.forum.mista.ru/index.php"
  driver = webdriver.Chrome()
  return driver

def authenticate(url, driver):
  driver.get(url)
  username = "Добрый хачик"
  password = "11"
  uname = driver.find_element_by_name("user_name")
  uname.send_keys(_convert(username))
  passw = driver.find_element_by_name("user_password")
  passw.send_keys(password)
  submit_button = driver.find_element_by_class_name("sendbutton").click()
  url_edit = "http://www.forum.mista.ru/users.php?action=edit"
  driver.get(url_edit)
  a = driver.find_element_by_xpath("//a[@href='#tab3']")
  a.click()
  topics = driver.find_element_by_name("topics_per_page")
  topics.clear()
  topics.send_keys(99)
  section = driver.find_element_by_name("column_forum")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_replies")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("show_topic_section")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_author")
  if not section.is_selected():
    section.click()
  section = driver.find_element_by_name("column_updated")
  if not section.is_selected():
    section.click()       
  submit_button = driver.find_element_by_name("Submit").click()

base_url = 'http://www.forum.mista.ru/'
driver = get_driver()
authenticate(base_url, driver)

def save_links_list_on_page(page, driver, n=0):
  driver.get(page)
  ban_list = get_list_of_users_on_page(driver.page_source)
  html = driver.page_source.replace('\t', ' ').replace('\n', ' ')
                           .replace('\r', ' ').encode('utf-8')
  soup = BeautifulSoup(html, "html.parser")
  noindex_tag = soup.find('noindex')
  a_tag =noindex_tag.findAll('a')
  if len(a_tag) == 1:
    link_to_return = ''
  else:
    link_to_return = base_url+a_tag[1]['href']
  return link_to_return

current_url = 'http://www.forum.mista.ru/ban_list.php'
n = 1
while current_url != '':
  current_url = save_links_list_on_page(current_url, driver, n)
  n += 1
  if current_url == '' or 
     current_url == 'http://www.forum.mista.ru/ban_list.php?next=':
    break

vdisplay.stop()


В итоге у меня получилось 4 набора данных — скачанные вебстраницы тем, скачанные веб-страницы профилей, список тем и список банов.


В этом месте я понял что допустил серьезную ошибку и нарвался на проблему «маленьких файлов», про которую слышал на курсе про Хадуп и ХДФС. Миллион файлов весом от 1 до 20 килобайт не самая удачная data для обработки. А так как файлы хранились на Google Cloud Storage и количество операций над ними ограничено, то предстояло сначала их куда-то поместить, а уже потом объединять. Для решения этой внезапной проблемы я создал сервер и скопировал все файлы на него, что заняло тоже приличное время, несмотря на то, что файлы между серверами и GCS просто летают.


Вот команда, которая которой я воспользовался для объединения файлов:


cat * > combo


И получил следующую ошибку:


-bash: /bin/cat: Argument list too long


Попробовал эту же команду, но уже в цикле:


for f in *.txt; do (cat "${f}") >> ~/combo; done;


Но замер показал, что пока объединяется миллион файлов можно успеть посмотреть сезон какого-нибудь сериала. Поэтому, после очередной консультации с гуглом, получил скрипт:


echo * | xargs paste -s -d \n > ~/foo.txt


Для тех кто не знал — работает это ОЧЕНЬ БЫСТРО. Объединение завершилось за несколько минут.


Итого результат граббинга:


  1. файл 30 гб и миллион страниц тем
  2. файл 500 мб и 200 тысяч страниц профилей (большая часть пустых)
  3. файл 100 мб со список тем
  4. файл 500 кб со списком банов


Пора приступать к разбору полученного.


Parsing stage


Для парсинга страниц написал следующий скрипт:


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

from bs4 import BeautifulSoup
from dateutil import parser
from pytz import timezone
import datetime
import re
import sys

reload(sys)
sys.setdefaultencoding("utf-8")

def get_empty_message():
  message = {}
  message['message_id'] = ''
  message['message_date_time'] = ''
  message['message_user_id'] = ''
  message['message_user_name'] = ''
  message['message_user_id'] = ''
  message['message_user_class'] = ''
  message['message_text'] = ''
  message['message_reply_to'] = ''
  message['message_external_links'] = ''
  return message

def get_list_of_messages_on_page(source):

  tz = timezone('Europe/Moscow')
  fmt = '%Y-%m-%d %H:%M:%S %Z%z'

  def parse_message_time(str_message_time):
    str_message_time = '-'.join(str_message_time.split('-')[-2:])
    message_time = parser.parse(str_message_time)
    message_time = tz.localize(message_time)
    return message_time.strftime(fmt)

  soup = BeautifulSoup(source, "html.parser")
  message_elements = soup.findAll('tr', id=re.compile("^message_\d+"))
  message_list = list()
  for message_element in message_elements:
    message = {}
    # message info
    message['message_id'] = int(re.sub("message_", "", message_element['id']))
    message_date_time = message_element
                          .find('div', { 'class' : 'message-info' })
                          .getText().replace(u'\xa0', u'')
    message['message_date_time'] = parse_message_time(message_date_time)
    # message user info
    user_info_element = message_element
                          .find('td', id=re.compile("^tduser\d+"))
                          .find('a', attrs = {'data-user_id' : True})
    if user_info_element:
      message['message_user_id'] = user_info_element['data-user_id']
      message['message_user_name'] = user_info_element['data-user_name']
                                      .replace('\t','')
      message['message_user_id'] = user_info_element['href']
                                    .replace('users.php?id=','')
      message['message_user_class'] = ' '.join(user_info_element['class'])
                                         .replace('\t','')
    else:
      user_info_element = message_element
                            .find('span', { 'class' : 'anonym-user' })
      message['message_user_id'] = ''
      message['message_user_name'] = user_info_element.getText()
                                      .replace('\t','')
      message['message_user_id'] = ''
      message['message_user_class'] = 'anonym-user'
    # message content
    message_text_element = message_element
                            .find('div', { 'class' : 'message-text' })
    if not message_text_element:
      message['message_text'] = ''
      message['message_reply_to'] = ''
      message['message_external_links'] = ''
    else:
      answer_link_elements = message_text_element
                              .findAll('a', { 'class' : 'answer-link' })
      for answer_link_element in answer_link_elements:
        answer_link_element.decompose()
      inner_link_elements = message_text_element
                             .findAll('a', { 'class' : 'interlink', 
                                             'data-rel' : re.compile("^#\d+") })
      inner_link_list = list()
      for inner_link_element in inner_link_elements:
        inner_link_list.append(inner_link_element.extract().getText()
                                .encode('utf-8'))
      other_link_elements = message_text_element.findAll('a')
      other_link_list = list()
      for other_link_element in other_link_elements:
        other_link = other_link_element.extract()['href']
        other_link_list.append(other_link.encode('utf-8'))
      message_text = message_text_element.getText()
      message_text = message_text.replace('\t',' ').replace('\n',' ')
                      .replace('()','').strip()
      message['message_text'] = message_text
      message['message_reply_to'] = ' '.join(inner_link_list)
      message['message_external_links'] = ' '.join(other_link_list)
    message_list.append(message)
  return message_list

if __name__ == "__main__":
  for line in sys.stdin:
    try:
      topic_id, page_number, link, source = line.strip().split('\t')
    except Exception as e:
      continue

    if source == 'Тема не существует.










':
      message_list = list()
      message = get_empty_message()
      message_list.append(message)
    else:
      try:
        message_list = get_list_of_messages_on_page(source)
      except Exception as e:
        message_list = list()
        message = get_empty_message()
        message['message_user_name'] = 'ERRORERRORERROR'
        message['message_text'] = str(e)
        message_list.append(message)

    for message in message_list:
      # 12 columns
      print ('%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s' % 
        ( topic_id
        , page_number
        , link
        , message['message_id']
        , message['message_date_time'].encode('utf-8')
        , message['message_user_name'].encode('utf-8')
        , message['message_user_id'].encode('utf-8')
        , message['message_user_class'].encode('utf-8')
        , message['message_text'].encode('utf-8')
        , message['message_reply_to'].encode('utf-8')
        , message['message_external_links'].encode('utf-8')))


Скрипт получал строку, содержащую код страницы ветки с сообщениями, разбирал по тегам и складывал все сообщения в tsv. Для непосредственного парсинга использовал довольно удобную BeautifulSoup. Однако, как оказалось потом, это было не самым оптимальным решением. Суть проблемы и ее решение будет ниже.


Схожий подход использовал для парсинга страниц с профилями пользователей:


#!/usr/bin/env python
# -*- coding: utf-8 -*- 

# export PYTHONIOENCODING=UTF-8

from bs4 import BeautifulSoup
from dateutil import parser
from pytz import timezone
import datetime
import re
import sys

reload(sys)
sys.setdefaultencoding("utf-8")

def get_empty_user():
  empty_user = {}
  empty_user['user_id'] = ''
  empty_user['user_name'] = ''
  empty_user['user_full_name'] = ''
  empty_user['user_email'] = ''
  empty_user['user_contacts'] = ''
  empty_user['user_url'] = ''
  empty_user['user_city_country'] = ''
  empty_user['user_dob'] = ''
  empty_user['user_timezone'] = ''
  empty_user['user_gender'] = ''
  empty_user['user_position'] = ''
  empty_user['user_achievement'] = ''
  empty_user['user_interests'] = ''
  empty_user['user_forum_role'] = ''
  empty_user['user_registration'] = ''
  empty_user['user_messages'] = ''
  empty_user['user_topics'] = ''
  return empty_user

month
    
            

© Habrahabr.ru