ParallelBeautifulSoup (BF4-hack)

Предлагаю протестировать скрипт написанный с помощью cloude 3.5 Sonnet с использованием специального промта. Сейчас и сам пробую, не знаю что из этого получится. Во всяком случае будет шаблон для реализации собственных идей. Scrapy конечно хорошая библиотека, но у него много лишнего функционала, нету модульности и иногда неадекватно парсит данные с сайта.
from functools import lru_cache, partial
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, TimeoutError
from multiprocessing import cpu_count, Queue, Process, Manager, shared_memory
from threading import Event, Thread
import asyncio
from typing import Union, Callable, Iterator, Any, Optional, AsyncIterator
from bs4 import BeautifulSoup, Tag
from itertools import chain
import numpy as np
from collections import deque
import gc
import signal
import weakref
from contextlib import contextmanager
import resource
import sys
import platform
import psutil
import tempfile
from pathlib import Path
from dataclasses import dataclass
from enum import Enum, auto
import ctypes
import os
import warnings
from time import sleep
class ParserPriority(Enum):
LOW = auto()
MEDIUM = auto()
HIGH = auto()
@dataclass
class ParserConfig:
chunk_size: int
max_workers: int
timeout: float
priority: ParserPriority
memory_limit: int # в МБ
class ParserException(Exception):
"""Базовое исключение для парсера"""
pass
class MemoryError(ParserException):
"""Ошибка превышения лимита памяти"""
pass
class TimeoutException(ParserException):
"""Ошибка превышения времени выполнения"""
pass
class MemoryManager:
def __init__(self):
self.os_type = platform.system().lower()
self.process = psutil.Process()
def cleanup_memory(self):
"""Кросс-платформенная очистка памяти"""
gc.collect()
if self.os_type == 'linux':
try:
with open('/proc/sys/vm/drop_caches', 'w') as f:
f.write('1')
except Exception:
pass
elif self.os_type == 'windows':
try:
ctypes.windll.psapi.EmptyWorkingSet(
ctypes.windll.kernel32.GetCurrentProcess()
)
except Exception:
pass
finally:
# Принудительный вызов сборщика мусора
for _ in range(3):
gc.collect()
def monitor_memory_usage(self, limit_mb: int) -> bool:
"""Проверка использования памяти"""
current_mem = self.process.memory_info().rss / (1024 * 1024) # В МБ
return current_mem < limit_mb
@contextmanager
def limit_memory(self, max_mem_mb: int):
"""Кросс-платформенное ограничение памяти"""
stop_monitoring = Event()
def memory_monitor():
while not stop_monitoring.is_set():
if not self.monitor_memory_usage(max_mem_mb):
raise MemoryError("Memory limit exceeded")
sleep(0.1) # Снижаем нагрузку на CPU
try:
if self.os_type == 'linux':
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(
resource.RLIMIT_AS,
(max_mem_mb * 1024 * 1024, hard)
)
elif self.os_type == 'windows':
# Запускаем мониторинг в отдельном потоке
monitor = Thread(target=memory_monitor)
monitor.daemon = True
monitor.start()
yield
finally:
if self.os_type == 'linux':
resource.setrlimit(resource.RLIMIT_AS, (soft, hard))
stop_monitoring.set()
self.cleanup_memory()
class ParallelParser:
def __init__(self, config: Optional[ParserConfig] = None):
self.config = config or self.get_platform_specific_config()
self.stop_event = Event()
self.manager = Manager()
self.shared_data = self.manager.dict()
self._cleanup_hooks = weakref.WeakSet()
self.memory_manager = MemoryManager()
self.temp_dir = Path(tempfile.gettempdir())
@staticmethod
def get_platform_specific_config() -> ParserConfig:
"""Получение оптимальной конфигурации для текущей ОС"""
if platform.system().lower() == 'windows':
return ParserConfig(
chunk_size=512 * 1024, # Меньший размер чанка
max_workers=min(cpu_count(), 61), # Ограничение процессов
timeout=45.0, # Увеличенный таймаут
priority=ParserPriority.MEDIUM,
memory_limit=512 # Меньший лимит памяти
)
else:
return ParserConfig(
chunk_size=1024 * 1024,
max_workers=cpu_count(),
timeout=30.0,
priority=ParserPriority.HIGH,
memory_limit=1024
)
def register_cleanup(self, callback: Callable):
"""Регистрация функции очистки"""
self._cleanup_hooks.add(callback)
def cleanup(self):
"""Выполнение всех зарегистрированных очисток"""
for callback in self._cleanup_hooks:
try:
callback()
except Exception as e:
warnings.warn(f"Cleanup error: {e}")
self.memory_manager.cleanup_memory()
def _split_to_chunks(self, data: Union[str, tuple]) -> tuple:
"""Оптимизированное разделение на чанки"""
return tuple(
data[i:i + self.config.chunk_size]
for i in range(0, len(data), self.config.chunk_size)
)
@staticmethod
def _init_worker(shm_name: str, stop_event: Event):
"""Инициализация рабочего процесса"""
global shared_arr, worker_stop
shared_arr = shared_memory.SharedMemory(name=shm_name)
worker_stop = stop_event
def _parse_chunk_safely(
self,
start_idx: int,
length: int,
parser: str
) -> BeautifulSoup:
"""Безопасный парсинг чанка с контролем памяти"""
try:
if worker_stop.is_set():
raise ParserException("Parsing cancelled")
chunk_data = shared_arr.buf[start_idx:start_idx + length]
chunk_str = chunk_data.tobytes().decode()
with self.memory_manager.limit_memory(
self.config.memory_limit // self.config.max_workers
):
return BeautifulSoup(chunk_str, parser)
except MemoryError:
worker_stop.set()
raise MemoryError("Worker exceeded memory limit")
except Exception as e:
worker_stop.set()
raise ParserException(f"Chunk parsing failed: {e}")
def create_parallel_parser(
self,
html: str,
parser: str = 'lxml',
) -> BeautifulSoup:
"""Улучшенный параллельный парсер с контролем ресурсов"""
try:
with self.memory_manager.limit_memory(self.config.memory_limit):
chunks = self._split_to_chunks(html)
# Используем разделяемую память для больших данных
shm = shared_memory.SharedMemory(create=True, size=len(html))
try:
shm_array = np.ndarray(
len(html),
dtype='uint8',
buffer=shm.buf
)
shm_array[:] = np.frombuffer(
html.encode(),
dtype='uint8'
)
with ProcessPoolExecutor(
max_workers=self.config.max_workers,
initializer=self._init_worker,
initargs=(shm.name, self.stop_event)
) as executor:
future_to_chunk = {
executor.submit(
self._parse_chunk_safely,
i,
len(chunk),
parser
): i for i, chunk in enumerate(chunks)
}
parsed_chunks = deque()
for future in self._as_completed_with_timeout(
future_to_chunk,
self.config.timeout
):
parsed_chunks.append(future.result())
finally:
shm.close()
shm.unlink()
return self._merge_chunks(parsed_chunks, parser)
except Exception as e:
self.cleanup()
raise ParserException(f"Parsing failed: {e}")
def _as_completed_with_timeout(self, future_to_chunk, timeout):
"""Обработка завершения задач с таймаутом"""
try:
for future in concurrent.futures.as_completed(
future_to_chunk,
timeout=timeout
):
yield future
except TimeoutError:
self.stop_event.set()
raise TimeoutException("Processing timeout exceeded")
def _merge_chunks(
self,
chunks: deque,
parser: str
) -> BeautifulSoup:
"""Оптимизированное слияние чанков"""
try:
merged_html = ''.join(str(chunk) for chunk in chunks)
return BeautifulSoup(merged_html, parser)
finally:
chunks.clear()
async def parallel_find_all_async(
self,
soup: BeautifulSoup,
*args,
**kwargs
) -> AsyncIterator[Tag]:
"""Асинхронная версия parallel_find_all"""
try:
all_elements = tuple(soup.find_all(*args, **kwargs))
chunks = self._split_to_chunks(all_elements)
async def process_chunk(chunk):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._process_chunk_safely,
chunk
)
tasks = [process_chunk(chunk) for chunk in chunks]
async for result in self._as_completed_async(tasks):
for item in result:
yield item
except Exception as e:
self.cleanup()
raise ParserException(f"Async find_all failed: {e}")
async def _as_completed_async(self, tasks: list) -> AsyncIterator:
"""Асинхронная обработка задач с таймаутом"""
for task in asyncio.as_completed(tasks):
try:
result = await asyncio.wait_for(
task,
timeout=self.config.timeout
)
yield result
except asyncio.TimeoutError:
self.stop_event.set()
raise TimeoutException("Task timeout exceeded")
def _process_chunk_safely(self, chunk: tuple) -> list:
"""Безопасная обработка чанка данных"""
try:
with self.memory_manager.limit_memory(
self.config.memory_limit // self.config.max_workers
):
return [tag for tag in chunk if tag is not None]
except Exception as e:
self.stop_event.set()
raise ParserException(f"Chunk processing failed: {e}")
def cancel_parsing(self):
"""Отмена текущей операции парсинга"""
self.stop_event.set()
self.cleanup()
# Пример использования
async def main():
# Создаем парсер с автоматической конфигурацией
parser = ParallelParser()
try:
# HTML для парсинга
html_content = """
"""
# Создаем суп
soup = parser.create_parallel_parser(html_content)
# Асинхронный поиск всех ссылок
print("Finding links...")
async for link in parser.parallel_find_all_async(soup, 'a', href=True):
print(f"Found link: {link['href']}")
# Поиск текстовых блоков
print("\nFinding text blocks...")
async for text_block in parser.parallel_find_all_async(
soup,
'div',
class_='text'
):
print(f"Found text: {text_block.text.strip()}")
except ParserException as e:
print(f"Parsing error: {e}")
except KeyboardInterrupt:
print("Parsing cancelled by user")
parser.cancel_parsing()
finally:
parser.cleanup()
if __name__ == '__main__':
asyncio.run(main())
Основные улучшения в объединенной версии:
Добавлена полная кросс-платформенная поддержка
Улучшен механизм контроля памяти
Добавлена автоматическая конфигурация под ОС
Улучшена обработка ошибок
Оптимизирована работа с памятью
Добавлен механизм отмены операций
Улучшена производительность через оптимизацию чанков
Добавлена поддержка асинхронных операций
Улучшена типизация
Ограничения:
Требуется Python 3.10+
Необходимы библиотеки: beautifulsoup4, lxml, numpy, psutil
На Windows некоторые оптимизации памяти работают иначе
Необходимо учитывать накладные расходы на параллелизм
Использование:
# Простой пример
parser = ParallelParser()
soup = parser.create_parallel_parser(html_content)
# Асинхронный поиск
async for element in parser.parallel_find_all_async(soup, 'div', class_='content'):
print(element.text)
# С ручной конфигурацией
config = ParserConfig(
chunk_size=1024 * 1024,
max_workers=4,
timeout=30.0,
priority=ParserPriority.HIGH,
memory_limit=1024
)
parser = ParallelParser(config)
ParallelBeautifulSoup — Документация
Оглавление
Введение
Установка и требования
Архитектура
Базовое использование
Продвинутые техники
Конфигурация
Обработка ошибок
Оптимизация производительности
Примеры использования
Ограничения и особенности
1. Введение
ParallelBeautifulSoup — это расширение для BeautifulSoup4, добавляющее возможности параллельной обработки данных и оптимизированного использования памяти при парсинге больших HTML документов.
Ключевые особенности:
Параллельная обработка HTML
Асинхронные операции поиска
Контроль памяти
Кросс-платформенная поддержка
Автоматическая конфигурация
Отказоустойчивость
2. Установка и требования
Системные требования:
Установка зависимостей:
pip install beautifulsoup4 lxml numpy psutil
Проверка установки:
import sys
import platform
def check_requirements():
print(f"Python version: {sys.version}")
print(f"Platform: {platform.platform()}")
print(f"CPU count: {cpu_count()}")
print(f"Memory: {psutil.virtual_memory().total / (1024**3):.2f} GB")
check_requirements()
3. Архитектура
Основные компоненты:
ParserConfig — конфигурация парсера:
@dataclass
class ParserConfig:
chunk_size: int # Размер чанка для обработки
max_workers: int # Количество параллельных процессов
timeout: float # Таймаут операций
priority: ParserPriority # Приоритет задач
memory_limit: int # Ограничение памяти в МБ
MemoryManager — управление памятью:
memory_manager = MemoryManager()
memory_manager.cleanup_memory() # Очистка памяти
memory_manager.monitor_memory_usage(limit_mb=1024) # Мониторинг
ParallelParser — основной класс:
parser = ParallelParser() # Автоматическая конфигурация
# или
parser = ParallelParser(custom_config) # Ручная конфигурация
4. Базовое использование
Создание парсера:
# Автоматическая конфигурация
parser = ParallelParser()
# Ручная конфигурация
config = ParserConfig(
chunk_size=1024 * 1024,
max_workers=4,
timeout=30.0,
priority=ParserPriority.HIGH,
memory_limit=1024
)
parser = ParallelParser(config)
Парсинг HTML:
# Простой парсинг
soup = parser.create_parallel_parser(html_content)
# С указанием парсера
soup = parser.create_parallel_parser(html_content, parser='lxml')
Асинхронный поиск:
async def find_elements():
# Поиск всех ссылок
async for link in parser.parallel_find_all_async(soup, 'a', href=True):
print(link['href'])
# Поиск по классу
async for div in parser.parallel_find_all_async(soup, 'div', class_='content'):
print(div.text)
5. Продвинутые техники
Управление памятью:
# Ручное управление очисткой
parser.cleanup()
# Использование контекстного менеджера памяти
with parser.memory_manager.limit_memory(max_mem_mb=1024):
# Код обработки
pass
Отмена операций:
try:
soup = parser.create_parallel_parser(huge_html)
except KeyboardInterrupt:
parser.cancel_parsing()
finally:
parser.cleanup()
Параллельная обработка результатов:
async def process_results():
results = []
async for element in parser.parallel_find_all_async(soup, 'div'):
# Параллельная обработка каждого элемента
processed = await process_element(element)
results.append(processed)
return results
6. Конфигурация
Автоматическая конфигурация:
# Windows-специфичная конфигурация
if platform.system().lower() == 'windows':
config = ParserConfig(
chunk_size=512 * 1024,
max_workers=min(cpu_count(), 61),
timeout=45.0,
priority=ParserPriority.MEDIUM,
memory_limit=512
)
Приоритеты:
# Доступные приоритеты
priorities = {
ParserPriority.LOW, # Фоновые задачи
ParserPriority.MEDIUM, # Обычные задачи
ParserPriority.HIGH # Критичные задачи
}
Тонкая настройка:
# Оптимизация под размер данных
def optimize_config(html_size: int) -> ParserConfig:
if html_size < 1024 * 1024: # < 1MB
return ParserConfig(
chunk_size=1024 * 64,
max_workers=2,
timeout=15.0,
priority=ParserPriority.LOW,
memory_limit=256
)
else:
return ParserConfig(
chunk_size=1024 * 1024,
max_workers=cpu_count(),
timeout=30.0,
priority=ParserPriority.HIGH,
memory_limit=1024
)
7. Обработка ошибок
Типы исключений:
try:
soup = parser.create_parallel_parser(html)
except ParserException as e:
print(f"Общая ошибка парсинга: {e}")
except MemoryError as e:
print(f"Превышен лимит памяти: {e}")
except TimeoutException as e:
print(f"Превышен таймаут: {e}")
except KeyboardInterrupt:
print("Отмена пользователем")
finally:
parser.cleanup()
Обработка ошибок в асинхронном режиме:
async def safe_parsing():
try:
async for element in parser.parallel_find_all_async(soup, 'div'):
try:
result = await process_element(element)
yield result
except Exception as e:
print(f"Error processing element: {e}")
continue
except Exception as e:
print(f"Fatal error: {e}")
parser.cancel_parsing()
8. Оптимизация производительности
Оптимизация размера чанков:
def calculate_optimal_chunk_size(html_size: int) -> int:
"""Расчет оптимального размера чанка"""
if html_size < 1024 * 1024: # < 1MB
return 1024 * 64 # 64KB chunks
elif html_size < 1024 * 1024 * 10: # < 10MB
return 1024 * 256 # 256KB chunks
else:
return 1024 * 1024 # 1MB chunks
Оптимизация количества процессов:
def optimize_workers(html_size: int) -> int:
"""Оптимизация количества воркеров"""
available_cpu = cpu_count()
if html_size < 1024 * 1024: # < 1MB
return min(2, available_cpu)
elif html_size < 1024 * 1024 * 10: # < 10MB
return min(4, available_cpu)
else:
return available_cpu
9. Примеры использования
Пример 1: Парсинг новостного сайта
async def parse_news_site():
parser = ParallelParser()
try:
soup = parser.create_parallel_parser(html_content)
# Поиск статей
articles = []
async for article in parser.parallel_find_all_async(
soup,
'article',
class_='news-item'
):
articles.append({
'title': article.find('h2').text.strip(),
'link': article.find('a')['href'],
'description': article.find('p', class_='description').text.strip()
})
return articles
finally:
parser.cleanup()
Пример 2: Обработка больших таблиц
async def parse_large_table():
config = ParserConfig(
chunk_size=1024 * 256,
max_workers=4,
timeout=60.0,
priority=ParserPriority.HIGH,
memory_limit=2048
)
parser = ParallelParser(config)
try:
soup = parser.create_parallel_parser(table_html)
rows = []
async for row in parser.parallel_find_all_async(soup, 'tr'):
cells = [cell.text.strip() for cell in row.find_all('td')]
if cells:
rows.append(cells)
return rows
except MemoryError:
print("Table too large, trying with smaller chunks...")
config.chunk_size //= 2
return await parse_large_table()
Пример 3: Асинхронный скрапинг
async def scrape_website():
parser = ParallelParser()
try:
soup = parser.create_parallel_parser(html_content)
# Параллельный сбор всех URL
urls = set()
async for link in parser.parallel_find_all_async(soup, 'a', href=True):
urls.add(link['href'])
# Параллельный сбор всех изображений
images = set()
async for img in parser.parallel_find_all_async(soup, 'img', src=True):
images.add(img['src'])
return {
'urls': urls,
'images': images
}
except TimeoutException:
print("Scraping timeout, partial results returned")
return {'urls': urls, 'images': images}
10. Ограничения и особенности
Системные ограничения:
Windows: ограничения на количество процессов
Linux: прямой контроль памяти
Общие: зависимость от CPU и RAM
Рекомендации по использованию:
Правильный выбор размера чанка
Мониторинг памяти
Обработка ошибок
Очистка ресурсов
Известные проблемы:
Возможные проблемы с кодировкой на Windows
Ограничения на размер shared memory
Накладные расходы на параллелизм
Советы по производительности:
Используйте автоматическую конфигурацию
Регулярно вызывайте cleanup ()
Правильно выбирайте приоритеты
Используйте асинхронные операции
