[Из песочницы] MongoDB и исследование рынка ИТ-вакансий

Вы когда-нибудь анализировали вакансии?

Задавались вопросом, в каких технологиях наиболее сильна потребность рынка труда на текущий момент? Месяц назад? Год назад?

Как часто открываются новые вакансии Java-разработчиков в определенном районе Вашего города и как активно они закрываются?

В этой статье я расскажу Вам, как можно достичь желаемого результата и построить отчетную систему по интересующей нас теме. Поехали!

z0f8y8sbxdtwpnhis-xvvclfodc.gif


Источник

Выбор пал на Headhunter.ru


Вероятно, многие из вас знакомы и даже пользовались таким ресурсом как Headhunter.ru. На этом сайте ежедневно размещаются тысячи новых вакансий в различных областях. Так же у HeadHunter существует API, позволяющий разработчику взаимодействовать с данными этого ресурса.

Инструментарий


На несложном примере рассмотрим построение процесса получения данных для отчетной системы, который базируется на работе с API сайта Headhunter.ru. В качестве промежуточного хранения информации будем использовать встраиваемую СУБД SQLite, обработанные данные будем хранить в NoSQL базе MongoDB, в качестве основного языка — Python версии 3.4.

HH API
Возможности HeadHunter API довольно обширны и хорошо описаны в официальной документации на GitHib. Прежде всего, это возможность отправки анонимных запросов, не требующих авторизации для получения информации о вакансиях в формате JSON. С недавних пор ряд методов стал платным (методы работодателя), но в данной задаче они рассматриваться не будут.


Каждая вакансия висит на сайте в течение 30 дней, после чего, если она не продлевается, то попадает в архив. Если вакансия попала в архив до истечения 30-ти дней, значит, она была закрыта работодателем.

HeadHunter API (далее — HH API) позволяет получать массив опубликованных вакансий за любую дату за последние 30 дней, чем и воспользуемся — будем на ежедневной основе собирать опубликованные вакансии за каждый день.

Реализация


  • Подключение БД SQLite
    import sqlite3
    conn_db = sqlite3.connect('hr.db', timeout=10)
    c = conn_db.cursor()    
    
  • Таблица для хранения изменения статуса вакансии
    Для удобства, будем сохранять историю изменения статуса вакансии (доступность на дату) в специальной таблице БД SQLite. Благодаря таблице vacancy_history нам будет известна на любую дату выгрузки доступность вакансии на сайте, т.е. в какие даты она была активна.
    c.execute('''             
                create table if not exists vacancy_history
                     (
                         id_vacancy integer, 
                         date_load text, 
                         date_from text,
                         date_to text
                     )''')
    
  • Фильтрация выборки вакансий
    Существует ограничение на то, что один запрос не может вернуть более 2000 коллекций, а так как в течение одного дня на сайте может быть опубликовано гораздо больше вакансий, поставим фильтр в теле запроса, например: вакансии только по Санкт-Петербургу (area = 2), по специализации IT (specialization = 1)
    path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}".format(page, per_page, date_from, date_to))
    
  • Дополнительные условия отбора
    Рынок труда активно растет и даже с учетом фильтра количество вакансий может превысить 2000, поэтому установим дополнительное ограничение в виде раздельного запуска за каждый день: вакансии за первую половину дня и вакансии за вторую половину дня
    def get_vacancy_history():
        ...
        
        count_days = 30
        hours = 0
            
        while count_days >= 0:
            
            while hours < 24:
                date_from = (cur_date.replace(hour=hours, minute=0, second=0) - 
                                 td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S')
    
                date_to = (cur_date.replace(hour=hours + 11, minute=59, second=59) - 
                               td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S')
    
                while count == per_page:
                    path = ("/vacancies?area=2&specialization=1&page={}
    &per_page={}&date_from={}&date_to={}"
                            .format(page, per_page, date_from, date_to))
    
                    conn.request("GET", path, headers=headers)
                    response = conn.getresponse()
                    vacancies = response.read()
                    conn.close()
    
                    count = len(json.loads(vacancies)['items'])
                    
                    ...
                       
                    # Вставка значений в БД
                    try:
                        c.executemany('INSERT INTO vacancy_history VALUES (?,?,?,?)', collection_for_ins)
                    except sqlite3.DatabaseError as err:       
                        print("Error: ", err)
                    else:
                        conn_db.commit()
    
                    if collection_for_ins:
                        page = page + 1
                        total = total + count
                        # обнуление массива
                        del(collection_for_ins[:])
    
                hours = hours + 12
            
            count_days = count_days - 1
            hours = 0
    
    


Первый пример использования
Предположим, что перед нами стоит задача определить вакансии, которые были закрыты за определенный интервал времени, например, за июль 2018 года. Это решается следующим образом: результат несложного SQL запроса к таблице vacancy_history возвратит нужные нам данные, которые можно передать в DataFrame для последующего анализа:
    c.execute("""
            select 
                    a.id_vacancy,
                    date(a.date_load) as date_last_load,
                    date(a.date_from) as date_publish,
                    ifnull(a.date_next, date(a.date_load, '+1 day')) as date_close
            from (
                select 
                    vh1.id_vacancy,
                    vh1.date_load,
                    vh1.date_from,
                    min(vh2.date_load) as date_next
                from vacancy_history vh1
                left join vacancy_history vh2
                    on vh1.id_vacancy = vh2.id_vacancy
                    and vh1.date_load < vh2.date_load
                where date(vh1.date_load) between :date_in and :date_out
                group by 
                    vh1.id_vacancy,
                    vh1.date_load,
                    vh1.date_from
                ) as a
            where a.date_next is null
            """, 
              {"date_in" : date_in, "date_out" : date_out})

date_in = dt.datetime(2018, 7, 1)
date_out = dt.datetime(2018, 7, 31)

closed_vacancies = get_closed_by_period(date_in, date_out)

df = pd.DataFrame(closed_vacancies, 
columns = ['id_vacancy', 'date_last_load', 'date_publish', 'date_close'])
df.head()

Получаем результат такого вида:
id_vacancy date_last_load date_publish date_close
0 18126697 2018–07–09 2018–07–09 2018–07–10
1 18155121 2018–07–09 2018–06–19 2018–07–10
2 18881605 2018–07–09 2018–07–02 2018–07–10
3 19620783 2018–07–09 2018–06–27 2018–07–10
4 19696188 2018–07–09 2018–06–15 2018–07–10
Если мы хотим провести анализ средствами Excel или сторонними BI-инструментами, то можно выгрузить таблицу vacancy_history в csv-файл для последующего анализа:
# Экспорт полной таблицы из БД в CSV
data = c.execute('select * from vacancy_history')

with open('vacancy_history.csv','w', newline='') as out_csv_file:
    csv_out = csv.writer(out_csv_file)                       
    csv_out.writerow(d[0] for d in data.description)
    csv_out.writerows(data.fetchall())

conn_db.close()


Тяжелая артиллерия


А что, если нам нужно провести более сложный анализ данных? Здесь на помощь приходит документоориентированная NoSQL база данных MongoDB, которая позволяет хранить данные в JSON-формате.

  • Демонстрационный экземпляр моей базы MongoDB развернут в облачном сервисе mLab, который позволяет бесплатно создавать базу данных до 500MB, чего вполне достаточно для разбора текущей задачи. В базе данных hr_db имеется коллекция Vacancy, к которой установим соединение:
    # Подключаем облачную базу Mongo
    from pymongo import MongoClient
    from pymongo import ASCENDING
    from pymongo import errors
    client = MongoClient('mongodb://:@ds115219.mlab.com:15219/hr_db')
    db = client.hr_db
    VacancyMongo = db.Vacancy
    
  • Стоит отметить, что не всегда уровень заработной платы указывается в рублях, поэтому для анализа необходимо привести все значения к рублевому эквиваленту. Для этого выкачиваем с помощью HH API коллекцию словарей, где содержится информация о курсе валют на текущую дату:
    # Получение справочника
    def get_dictionaries():
        conn = http.client.HTTPSConnection("api.hh.ru")
        conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers)
        response = conn.getresponse()
        if response.status != 200:
            conn.close()
            conn = http.client.HTTPSConnection("api.hh.ru")
            conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers)
            response = conn.getresponse()
        dictionaries = response.read()
        dictionaries_json = json.loads(dictionaries)
    
        return dictionaries_json
    
  • Заполнение словаря с валютами текущими курсами валют:
    hh_dictionary = get_dictionaries()
    currencies = hh_dictionary['currency']
    
    currency_rates = {}
    
    for currency in currencies:
        currency_rates[currency['code']] = currency['rate']  
    
    

Вышеописанные действия по сбору вакансий запускаются на ежедневной основе, поэтому нет необходимости каждый раз просматривать все вакансии и получать по каждой из них детальную информацию. Будем брать только те, что были получены за последние пять дней.

  • Получение массива вакансий за последние 5 дней из БД SQLite:
    def get_list_of_vacancies_sql():
        
        conn_db = sqlite3.connect('hr.db', timeout=10)
        conn_db.row_factory = lambda cursor, row: row[0]
        c = conn_db.cursor()
        items = c.execute("""
                    select 
                        distinct id_vacancy
                    from vacancy_history
                    where date(date_load) >= date('now', '-5 day')
                """).fetchall()
        
        conn_db.close()
        return items
    
  • Получение массива вакансий за последние пять дней из MongoDB:
    def get_list_of_vacancies_nosql():
        
        date_load = (dt.datetime.now() - td(days=5)).strftime('%Y-%m-%d')
        vacancies_from_mongo = []
    
        for item in VacancyMongo.find({"date_load" : {"$gte" : date_load}}, {"id" : 1, "_id" : 0}):
            vacancies_from_mongo.append(int(item['id']))
       
        return vacancies_from_mongo
    
    
  • Остается найти разницу между двумя массивами, по тем вакансиям, которых нет в MongoDB, получить детальную информацию и записать ее в базу данных:
    sql_list = get_list_of_vacancies_sql()
    mongo_list = get_list_of_vacancies_nosql()
    vac_for_proс = []
    
    s = set(mongo_list)
    vac_for_proс = [x for x in sql_list if x not in s]
    
    vac_id_chunks = [vac_for_proс[x: x + 500] for x in range(0, len(vac_for_proс), 500)]
    
    
  • Итак, у нас готов массив с новыми вакансиями, которых еще нет в MongoDB, по каждой из них мы получим детальную информацию с помощью запроса в HH API, перед непосредственной записью в MongoDB обработаем каждый документ:
    1. Приведем величину заработной платы к рублевому эквиваленту;
    2. Добавим к каждой вакансии градацию уровня специалиста (Junior/Middle/Senior etc)

    Все это реализуем в функции vacancies_processing:
    from nltk.stem.snowball import SnowballStemmer
    stemmer = SnowballStemmer("russian") 
    
    def vacancies_processing(vacancies_list):
        
        cur_date = dt.datetime.now().strftime('%Y-%m-%d')
    
        for vacancy_id in vacancies_list:
            conn = http.client.HTTPSConnection("api.hh.ru")
            conn.request("GET", "/vacancies/{}".format(vacancy_id), headers=headers)
            response = conn.getresponse()
            if response.status != 404:
                vacancy_txt = response.read()
                conn.close()
                vacancy = json.loads(vacancy_txt)
    
                # salary
                salary = None
                if 'salary' in vacancy:
                    if vacancy['salary'] != None:
                        ...
    
                    max_salary = 500000
                    if salary is not None:
                        salary = int(salary)
                        if salary >= max_salary:
                            salary = max_salary
    
                # grade
                grade = None
                if 'name' in vacancy:
                    p_grade = ''
                    title = re.sub(u'[^a-zа-я]+', ' ', vacancy['name'].lower(), re.UNICODE)
                    words = re.split(r'\s{1,}', title.strip())
                    for title_word in words:
                        title_word = stemmer.stem(title_word)
                        if len(title_word.strip()) > 1:
                            p_grade = p_grade + " " + title_word.strip()
    
                    if re.search('(главн)|(princip)', p_grade):
                        grade = 'principal'    
                    elif re.search('(ведущ)|(senior)|([f|F]ull)', p_grade):
                        grade = 'senior'
                    ...
                    else:
                        grade = 'not specify'
    
                vacancy['salary_processed'] = salary
                vacancy['date_load'] = cur_date
                vacancy['grade'] = grade
                vacancy.pop('branded_description', None)
    
                try:
                    post_id = VacancyMongo.insert_one(vacancy)
                except errors.DuplicateKeyError:
                    print ('Cant insert the duplicate vacancy_id:', vacancy['id'])
    
    
  • Получение детальной информации путем обращения к HH API, предобработку полученных
    данных и вставку их в MongoDB будет проводить в несколько потоков, по 500 вакансий в каждом:
    t_num = 1
    threads = []
    
    for vac_id_chunk in vac_id_chunks:
        print('starting', t_num)
        t_num = t_num + 1
        t = threading.Thread(target=vacancies_processing, kwargs={'vacancies_list': vac_id_chunk})
        threads.append(t)
        t.start()
        
    for t in threads:
        t.join()
    


Заполненная коллекция в MongoDB выглядит примерно следуюшим образом:

ervqsyhywfqykkbh2stmqgumc9q.jpeg

Еще немного примеров


Имея в распоряжении собранную базу данных, можем выполнять различные аналитические выборки. Итак, выведу Топ-10 самых высокооплачиваемых вакансий Python-разработчиков в Санкт-Петербурге:

cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[pP]ython*"}})
 
df_mongo = pd.DataFrame(list(cursor_mongo))
del df_mongo['_id']
 
pd.concat([df_mongo.drop(['employer'], axis=1), 
           df_mongo['employer'].apply(pd.Series)['name']], axis=1)[['grade',
                                                                    'name', 
                                                                    'salary_processed'
                                                                   ]].sort_values('salary_processed',
                                                                                  ascending=False)[:10]


Топ-10 самых высокооплачиваемых вакансий Python
grade name name salary_processed
senior Web Team Lead / Архитектор (Python/Django/React) Investex Ltd 293901.0
senior Senior Python разработчик в Черногорию Betmaster 277141.0
senior Senior Python разработчик в Черногорию Betmaster 275289.0
middle Back-End Web Developer (Python) Soshace 250000.0
middle Back-End Web Developer (Python) Soshace 250000.0
senior Lead Python Engineer for a Swiss Startup Assaia International AG 250000.0
middle Back-End Web Developer (Python) Soshace 250000.0
middle Back-End Web Developer (Python) Soshace 250000.0
senior Python teamlead DigitalHR 230000.0
senior Ведущий разработчик (Python, PHP, Javascript) IK GROUP 220231.0

А теперь выведем, возле какой станции метро наивысшая концентрация вакантных должностей для Java-разработчиков. С помощью регулярного выражения фильтрую по названиям вакансии «Java», а так же отбираю только те вакансии, где указан адрес:

cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[jJ]ava[^sS]"}, "address" : {"$ne" : None}})
df_mongo = pd.DataFrame(list(cursor_mongo))

df_mongo['metro'] = df_mongo.apply(lambda x: x['address']['metro']['station_name']
                                   if x['address']['metro'] is not None 
                                   else None, axis = 1)
 
df_mongo.groupby('metro')['_id'] \
                                .count() \
                                .reset_index(name='count') \
                                .sort_values(['count'], ascending=False) \
                                [:10]


Вакансии Java-разработчиков по станциям метро
metro count
Василеостровская 87
Петроградская 68
Выборгская 46
Площадь Ленина 45
Горьковская 45
Чкаловская 43
Нарвская 32
Площадь Восстания 29
Старая Деревня 29
Елизаровская 27


Итоги


Итак, аналитические возможности разработанной системы поистине широкие и могут использоваться для планирования стартапа или открытия нового направления деятельности.

Замечу, что представлен пока лишь базовый функционал системы, в дальнейшем планируется развитие в сторону анализа по географическим координатам и предсказания появления вакансий в том или ином районе города.

Полный исходный код к этой статье Вы можете найти по ссылке на мой GitHub.

P.S. Комментарии к статье приветствуются, буду рад ответить на все Ваши вопросы и узнать Ваше мнение. Спасибо!

© Habrahabr.ru