[Перевод] Поисковый движок в 80 строках Python

374438b7a30ad7aad20ebff7ca293e41.png

В сентябре я устроился на должность поискового дата-саентиста и с тех пор часть моих обязанностей заключается в работе с Solr — опенсорсным поисковым движком на основе Lucene. Я знал основы работы поискового движка, но мне хотелось понять его ещё лучше. Поэтому я закатал рукава и решил создать его с нуля.

Давайте поговорим о целях. Слышали когда-нибудь о «кризисе сложности обнаружения маленьких веб-сайтов»? Проблема в том. что маленькие веб-сайты наподобие моего невозможно найти при помощи Google или любого другого поискового движка. Какова же моя миссия? Сделать эти крошечные веб-сайты снова великими. Я верю в возвращение славы этих малышей вдали от SEO-безумия Google.

В этом посте я подробно расскажу о процессе создания поискового движка с нуля на Python. Как обычно, весь написанный мной код можно найти в моём GitHub (репозиторий microsearch). Эта реализация не будет притворяться готовым к продакшену поисковым движком, это лишь полезный пример, демонстрирующий внутреннюю работу поискового движка.

Кроме того, мне стоит признаться, что в заголовке поста я слегка преувеличил. Да, поисковый движок действительно реализован примерно в 80 строках Python, но я ещё и писал вспомогательный код (краулер данных, API, HTML-шаблоны и так далее), из-за которого весь проект становится немного больше. Однако я считаю, что интересная часть проекта находится в поисковом движке, который состоит из менее чем 80 строк.

P.S. Написав этот пост и microsearch, я осознал, что пару лет назад нечто похожее написал Барт де Гёде. Моя реализация очень похожа на работу Барта, но я считаю что кое-что улучшил, в частности: (1) мой краулер асинхронный, что сильно ускоряет работу, (2) я реализовал пользовательский интерфейс, позволяющий взаимодействовать с поисковым движком.

microsearch

Давайте изучим компоненты, из которых состоит microsearch, и поговорим о том, как я изготовил каждый элемент: (1) краулер, (2) инвертированный индекс, (3) алгоритм ранжирования, (4) интерфейс. В последующих разделах я представлю теоретические описания и практические подробности того, как в моём проекте реализована каждая из концепций.

Краулер

Первый этап создания поискового движка — это получение данных для поиска. Можно выполнять краулинг или готовых данных (как это делает Google), или использовать собственные данные (как это делают Wallapop и все другие торговые площадки/маркетплейсы).

Так как одной из моих целей было создание «локального Google», для сборки поискового движка я решил использовать данные из блогов, на которые подписан. В этом случае краулинг состоит из скачивания и очистки всех постов определённого списка блогов. Чтобы упростить задачу, я выполнял краулинг только постов блогов с RSS. [Если у вас есть блог, но нет RSS-фида, то прервите чтение и добавьте его. Я с радостью подпишусь на ваш блог и буду читать новый контент без необходимости зависеть от Google или какой-то социальной сети.] А чтобы ускорить его, я воспользовался библиотекой Python asyncio. Использование асинхронного кода позволило сократить время краулинга с 20 минут до 20 секунд.

В моём случае был взят список из 642 RSS-фидов. Из этих фидов примерно сто я читал (блоги о машинном обучении, data science, математике и так далее), а остальные пятьсот я соскрейпил из проекта surprisetalk blogs.hn.

Код краулера

import argparse
import aiohttp
import asyncio
import feedparser
import pandas as pd
from bs4 import BeautifulSoup
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def parse_feed(feed_url):
    try:
        feed = feedparser.parse(feed_url)
        return [entry.link for entry in feed.entries]
    except Exception as e:
        print(f"Error parsing feed {feed_url}: {e}")
        return []


async def fetch_content(session, url):
    async with session.get(url) as response:
        return await response.text()


async def process_feed(feed_url, session, loop):
    try:
        post_urls = await loop.run_in_executor(None, parse_feed, feed_url)
        tasks = [fetch_content(session, post_url) for post_url in post_urls]
        post_contents = await asyncio.gather(*tasks)
        cleaned_contents = [clean_content(content) for content in post_contents]
        return list(zip(post_urls, cleaned_contents))
    except Exception as e:
        print(f"Error processing feed {feed_url}: {e}")
        return []


def clean_content(html_content):
    soup = BeautifulSoup(html_content, "html.parser")
    for script in soup(["script", "style"]):
        script.extract()
    text = soup.get_text()
    lines = (line.strip() for line in text.splitlines())
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    cleaned_text = " ".join(chunk for chunk in chunks if chunk)
    return cleaned_text


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--feed-path")
    return parser.parse_args()


async def main(feed_file):
    async with aiohttp.ClientSession() as session:
        loop = asyncio.get_event_loop()
        with open(feed_file, "r") as file:
            feed_urls = [line.strip() for line in file]

        tasks = [process_feed(feed_url, session, loop) for feed_url in feed_urls]
        results = await asyncio.gather(*tasks)

    flattened_results = [item for sublist in results for item in sublist]
    df = pd.DataFrame(flattened_results, columns=["URL", "content"])
    df.to_parquet("output.parquet", index=False)


if __name__ == "__main__":
    args = parse_args()
    asyncio.run(main(args.feed_path))

Инвертированный индекс

Инвертированный индекс — это структура данных, сопоставляющая ключевые слова с документами. Благодаря этой структуре данных очень легко находить документы, в которых встречается определённое слово. Когда пользователь отправляет поисковый запрос, инвертированный индекс используется для получения всех документов, соответствующих ключевым словам в запросе.

Для реализации инвертированного индекса я использовал defaultdict с сигнатурой dict[str, dict[str, int]]. То есть, это отображение, возвращающее для слова (str) ещё одно отображение URL (str) на количество вхождений этого слова в URL (int). Значение по умолчанию для отображения — это отображение из URL в 0, поэтому если мы попробуем получить значение ключевого слова, не существующего в URL, получим ноль.

Логика инвертированного индекса определена в классе SearchEngine. Мы инициализируем его двумя приватными словарями.

class SearchEngine:
    def __init__(self):
        self._index: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self._documents: dict[str, str] = {}

Затем мы реализуем метод index, который получает URL и его содержимое, нормализует контент (то есть удаляет пунктуацию, преобразует всё в нижний регистр и так далее), а затем добавляет его в индекс.

def index(self, url: str, content: str) -> None:
    self._documents[url] = content
    words = normalize_string(content).split(" ")
    for word in words:
        self._index[word][url] += 1

Чтобы сделать процесс индексирования более удобным, мы можем реализовать опцию общего индекса, которая получает список URL и документов, а затем индексирует их.

def bulk_index(self, documents: list[tuple[str, str]]):
    for url, content in documents:
        self.index(url, content)

Далее мы можем считать индекс при помощи метода get_url, получающего ключевое слово и возвращающего все URL, которые содержат это ключевое слово.

def get_urls(self, keyword: str) -> dict[str, int]:
    keyword = normalize_string(keyword)
    return self._index[keyword]

Например, для индексации документа Foo с текстом Hello, World! My name is Foo! и документа Bar с текстом Hello, World! My name is Bar, I'm not Foo! с дальнейшим поиском слова Foo мы можем сделать следующее:

>>> from microsearch.engine import engine
>>> engine.index("Foo", "Hello, World! My name is Foo!")
>>> engine.index("Bar", "Hello, World! My name is Bar, I'm not Foo!")
>>> engine.get_urls("foo")
defaultdict(, {'Foo': 1, 'Bar': 1})
>>> engine.get_urls("Foo")
defaultdict(, {'Foo': 1, 'Bar': 1})

Алгоритм ранжирования

Получив множество соответствующих запросу документов, их нужно как-то отсортировать. Самый известный алгоритм ранжирования — это PageRank Google, ранжирующий документы на основании ссылок. Однако существуют и другие варианты ранжирования документов, например, BM25, ранжирующий документы на основании контента. Я решил воспользоваться стандартным BM25. Величина оценки между запросом Q и документом D вычисляется следующим образом:

\sum_{i=1}^n \text{IDF}(q_i) \frac{f(q_i, D)\times(k_1 + 1)}{f(q_i, D) + k_1 \left(1 - b + b \frac{|D|}{\text{avgdl}}\right)}

где запрос Q содержит ключевые слова q1,  q2, …,  qn, документ D имеет длину |D|, средняя длина документа обозначена как avgdl,  k1 и b — свободные параметры,  f (qi, D) — количество вхождений ключевого слова qi в документе D и, наконец, IDF (qi) — обратная частота документа, вычисляемая так:

\text{IDF}(q_i) = \ln \left(1 + \frac{N - n(q_i) + 0.5}{ n(q_i) + 0.5}\right)

где N — количество документов, а n (qi) — количество документов, содержащих qi. Существуют и другие способы вычисления IDF, но этот вариант можно оправдать теоретически.

Благодаря всей этой математике мы готовы реализовать недостающую часть класса SearchEngine. Во-первых, нам нужно добавить константы k1 и d как параметры нашего класса:

class SearchEngine:
    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self._index: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self._documents: dict[str, str] = {}
        self.k1 = k1
        self.b = b

Теперь мы раскроем полезные свойства, которыми воспользуемся позже:

@property
def posts(self) -> list[str]:
    return list(self._documents.keys())

@property
def number_of_documents(self) -> int:
    return len(self._documents)

@property
def avdl(self) -> float: 
    return sum(len(d) for d in self._documents.values()) / len(self._documents)

Имея эту информацию, можно реализовать нашу систему вычисления оценок BM25. Первым делом нужно реализовать метод обратной частоты документов:

def idf(self, kw: str) -> float:
    N = self.number_of_documents
    n_kw = len(self.get_urls(kw))
    return log((N - n_kw + 0.5) / (n_kw + 0.5) + 1)

И при помощи этого метода можно наконец реализовать систему вычисления оценок BM. Этот метод получает ключевое слово и возвращает отображение от всех содержащих это ключевое слово URL с их оценкой.

def bm25(self, kw: str) -> dict[str, float]:
    result = {}
    idf_score = self.idf(kw)
    avdl = self.avdl
    for url, freq in self.get_urls(kw).items():
        numerator = freq * (self.k1 + 1)
        denominator = freq + self.k1 * (1 - self.b + self.b * len(self._documents[url]) / avdl)
        result[url] = idf_score * numerator / denominator
    return result

Этот метод получает ключевое слово и для всех индексированных документов вычисляет оценку BM25 этого ключевого слова. Имея этот метод, мы можем реализовать метод search, которым и будем пользоваться для выполнения запросов к поисковому движку.

Метод search получает запрос, нормализует его, извлекает его ключевые слова (то есть разбивает их по пробелам), вычисляет оценки BM25 для каждого ключевого слова и возвращает словарь URL с их общей оценкой.

def search(self, query: str) -> dict[str, float]:
    keywords = normalize_string(query).split(" ")
    url_scores: dict[str, float] = {}
    for kw in keywords:
        kw_urls_score = self.bm25(kw)
        url_scores = update_url_scores(url_scores, kw_urls_score)
    return url_scores

Снова взяв предыдущий пример, мы можем выполнить поиск следующим образом:

>>> from microsearch.engine import engine
>>> engine.index("Foo", "Hello, World! My name is Foo!")
>>> engine.index("Bar", "Hello, World! My name is Bar, I'm not Foo!")
>>> engine.search("foo")
{'Foo': 0.19869271730423296, 'Bar': 0.16844281759753774}
>>> engine.search("foo bar")
{'Foo': 0.19869271730423296, 'Bar': 0.8088260293054897}

Соединив всё вместе, мы получаем класс поискового движка, реализующий функциональность индексации и поиска документов в менее чем 80 строках кода.

Полный код поискового движка в 80 строках Python

from collections import defaultdict
from math import log
import string


def update_url_scores(old: dict[str, float], new: dict[str, float]):
    for url, score in new.items():
        if url in old:
            old[url] += score
        else:
            old[url] = score
    return old

def normalize_string(input_string: str) -> str:
    translation_table = str.maketrans(string.punctuation, ' ' * len(string.punctuation))
    string_without_punc = input_string.translate(translation_table)
    string_without_double_spaces = ' '.join(string_without_punc.split())
    return string_without_double_spaces.lower()


class SearchEngine:
    def __init__(self, k1: float = 1.5, b: float = 0.75):
        self._index: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
        self._documents: dict[str, str] = {}
        self.k1 = k1
        self.b = b

    @property
    def posts(self) -> list[str]:
        return list(self._documents.keys())

    @property
    def number_of_documents(self) -> int:
        return len(self._documents)

    @property
    def avdl(self) -> float:
        # todo: refactor this. it can be slow to compute it every time. compute it once and cache it
        return sum(len(d) for d in self._documents.values()) / len(self._documents)

    def idf(self, kw: str) -> float:
        N = self.number_of_documents
        n_kw = len(self.get_urls(kw))
        return log((N - n_kw + 0.5) / (n_kw + 0.5) + 1)

    def bm25(self, kw: str) -> dict[str, float]:
        result = {}
        idf_score = self.idf(kw)
        avdl = self.avdl
        for url, freq in self.get_urls(kw).items():
            numerator = freq * (self.k1 + 1)
            denominator = freq + self.k1 * (
                1 - self.b + self.b * len(self._documents[url]) / avdl
            )
            result[url] = idf_score * numerator / denominator
        return result

    def search(self, query: str) -> dict[str, float]:
        keywords = normalize_string(query).split(" ")
        url_scores: dict[str, float] = {}
        for kw in keywords:
            kw_urls_score = self.bm25(kw)
            url_scores = update_url_scores(url_scores, kw_urls_score)
        return url_scores

    def index(self, url: str, content: str) -> None:
        self._documents[url] = content
        words = normalize_string(content).split(" ")
        for word in words:
            self._index[word][url] += 1

    def bulk_index(self, documents: list[tuple[str, str]]):
        for url, content in documents:
            self.index(url, content)

    def get_urls(self, keyword: str) -> dict[str, int]:
        keyword = normalize_string(keyword)
        return self._index[keyword]

Интерфейс

Итак, у нас есть поисковый движок, но его надо как-то раскрыть. Я решил создать небольшое приложение FastAPI, раскрывающее конечную точку с поисковым движком, а также рендерящее небольшую веб-страницу, позволяющую выполнять поиск. Чтобы упростить чтение, я решил выбирать только верхние N URL.

Приложение поискового движка FastAPI

import argparse
from fastapi import FastAPI, Form, Path, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pandas as pd
from uvicorn import run

from microsearch.engine import SearchEngine


app = FastAPI()
engine = SearchEngine()
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")


def get_top_urls(scores_dict: dict, n: int):
    sorted_urls = sorted(scores_dict.items(), key=lambda x: x[1], reverse=True)
    top_n_urls = sorted_urls[:n]
    top_n_dict = dict(top_n_urls)
    return top_n_dict


@app.get("/", response_class=HTMLResponse)
async def search(request: Request):
    posts = engine.posts
    return templates.TemplateResponse(
        "search.html", {"request": request, "posts": posts}
    )


@app.get("/results/{query}", response_class=HTMLResponse)
async def search_results(request: Request, query: str = Path(...)):
    results = engine.search(query)
    results = get_top_urls(results, n=5)
    return templates.TemplateResponse(
        "results.html", {"request": request, "results": results, "query": query}
    )


@app.get("/about")
def read_about(request: Request):
    return templates.TemplateResponse("about.html", {"request": request})


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-path")
    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
    data = pd.read_parquet(args.data_path)
    content = list(zip(data["URL"].values, data["content"].values))
    engine.bulk_index(content)
    run(app, host="127.0.0.1", port=8000)

После запуска можно увидеть нечто подобное:

Search Engine interface.

Интерфейс поискового движка

Здесь можно вводить свои запросы и искать индексированные документы. Например, что будет, если поискать how to build a search engine?

Search results for the query `how to build a search engine`.

Поисковые результаты по запросу «how to build a search engine».

Я понимаю, что это не самый красивый UI, и что UX можно сильно улучшить. Однако движок быстро работает, выдаёт неплохие результаты и, что самое важное, я сам создал его с нуля.

Недостающие возможности

Пользователям, обычно работающим с поисковыми движками, очевидно, что в моей реализации многого недостаёт. Вот неполный список отсутствующих возможностей.

  • У этой реализации нет операторов запросов, то есть операторов, позволяющих настраивать поведение запросов. Например, если загуглить how to build a search engine -solr, то вы получите результаты, не содержащие слово solr.

  • Инвертированный индекс индексирует отдельные ключевые слова и отсутствует возможность индексации n-грамм. Google позволяет выполнять поиск "search engine" (обратите внимание на двойные кавычки), показывая только результаты, в которых эти два слова встречаются в данном конкретном порядке.

  • Кроме того, в этой реализации нет расширения запросов или документов, то есть если поискать engine, то вы не получите документы со словом engines.

  • Функции краулинга и индексации независимы друг от друга, но мы можем реализовать движок так, чтобы индексация происходила во время краулинга, то есть сразу после получения документа мы его индексируем. Это тоже можно делать асинхронно.

Выводы

Мне очень понравилось работать над этим проектом. Он помог мне лучше понять внутреннюю работу Solr, и хотя мне многому ещё предстоит научиться, кажется, теперь у меня есть более глубокое понимание.

Кроме того, я также узнал, насколько потрясающе писать асинхронный код для операций, для которых узкое место — это ввод-вывод. Моя первая реализация краулера работала очень долго, а при асинхронной реализации справилась за считанные секунды.

Следующим этапом моего пути по созданию собственного поискового движка будет реализация функций семантического поиска. Я уже немного поэкспериментировал с моделями эмбеддингов и ANN, поэтому на следующем этапе я добавлю в microsearch эту функциональность.

© Habrahabr.ru