[Из песочницы] Пишем свой канал-бот для Telegram как у Хабра на Python

Недавно ко мне обратился друг с просьбой написать бота, импортирующего новости из RSS-канала на сайте в Telegram-канал. Огромнейшим плюсом данного способа оповещения являются push-уведомления, которые приходят каждому подписанному пользователю на его устройство. Уже давно хотелось заняться чем-то подобным. Недолго думая, в качестве образца я выбрал канал Хабра telegram.me/habr_ru. В качестве языка программирования был выбран Python.

В итоге, мне надо было решить следующие проблемы:


  1. Парсинг RSS.
  2. Одним из условий был отложенный постинг сообщений (если после того, как новость была выложена, в течение n часов её скрыли/удалили/переименовали, то она не должна быть опубликована, вместо нее отправляется оповещение о корректной новости)
  3. Постинг сообщений в телеграм.
  4. Сокращение целевой ссылки с помощью сервиса bit.ly

От себя добавил еще:


  1. Ведение логов с помощью библиотеки (logging).
  2. Обработка конфига (configparser).


1. Отложенный постинг сообщений

Для решения данной проблемы было принято решение использовать SQLite базу данных. Для работы с БД использовалась библиотека SQLalchemy.

Структура до банального проста — всего одна таблица. Код объекта представлен ниже:

class News(Base):

    __tablename__ = 'news'
   id = Column(Integer, primary_key=True) # Порядковый номер новости
   text = Column(String) # Текст (Заголовок), который будет отправлен в сообщении
   link  = Column(String) # Ссылка на статью на сайте. Так же отправляется в сообщении
   date = Column(Integer)
   # Дата появления новости на сайте. Носит Чисто информационный характер. UNIX_TIME.
   publish = Column(Integer)
   # Планируемая дата публикации. Сообщение будет отправлено НЕ РАНЬШЕ этой даты. UNIX_TIME.
   chat_id = Column(Integer) 
   # Информационный столбец. В данное поле логируется чат, в который было отправлено сообщение
   message_id = Column(Integer) 
   # Информационный столбец. В данный столбец логирует внутренний идентификатор сообщения в канале. 

    def __init__(self, text, link, date, publish=0,chat_id=0,message_id=0):
        self.link = link
        self.text  = text
        self.date = date
        self.publish = publish
        self.chat_id = chat_id
        self.message_id = message_id

    def _keys(self):
        return (self.text, self.link)

    def __eq__(self, other):
        return self._keys() == other._keys()

    def __hash__(self):
        return hash(self._keys())

   def __repr__(self):
        return "" % (base64.b64decode(self.text).decode(),\
        base64.b64decode(self.link).decode(),\
        datetime.fromtimestamp(self.publish))
        # Для зрительного восприятия данные декодируются

Для хранения текстовой информации и ссылок использется base64, форматом хранения даты-времени был выбран Unix Timestamp.

Обработка данных сессии осуществляется отдельным классом.

Base = declarative_base()    

class database:

    """
    Класс для обработки сессии SQLAlchemy.
    Также включает в себя минимальный набор методов, вызываемых в управляющем классе.
    Названия методов говорящие.
    """
    def __init__(self, obj):
        engine = create_engine(obj, echo=False)
        Session = sessionmaker(bind=engine)
        self.session = Session()

    def add_news(self, news):
        self.session.add(news)
        self.session.commit()

    def get_post_without_message_id(self):
        return self.session.query(News).filter(and_(News.message_id == 0,\
                                  News.publish<=int(time.mktime(time.localtime())))).all()

    def update(self, link, chat, msg_id):
        self.session.query(News).filter_by(link = link).update({"chat_id":chat, "message_id":msg_id})
        self.session.commit()

    def find_link(self,link):
        if self.session.query(News).filter_by(link = link).first(): return True
        else: return False 

При обнаружении новости, она добавляется в базу. Сразу же задается время публикации.

Для обнаружения новостей готовых к публикации используется метод get_post_withwithout_message_id. Фактически, мы выбираем из базы все посты, у которых message_id=0 и дата публикации меньше текущего времени.

Для проверки на новизну отправляем запрос базе данных на факт содержания ссылки на новость (метод find_link).

Метод update служит для обновления данных, после публикации новости в канале.


2. Парсинг RSS

Стоит признаться, что писать свой RSS парсер совсем не хотелось, поэтому в бой вступила библиотека feedparser.

import feedparser

class source(object):
    def __init__(self, link):
        self.link = link
        self.news = []
        self.refresh()

    def refresh(self):
        data = feedparser.parse(sв качествеelf.link)
        self.news = [News(binascii.b2a_base64(i['title'].encode()).decode(),\
                    binascii.b2a_base64(i['link'].encode()).decode(),\
                    int(time.mktime(i['published_parsed']))) for i in data['entries']]

Код до смешного прост. При вызове метода refresh с помощью генератора формируется список объектов класса News из последних 30 размещенных постов в rss ленте.


3. Сокращение ссылок

Как упоминалось выше, в качестве сервиса был выбран bit.ly. API не вызвает лишних вопросов.

class bitly:
    def __init__(self,access_token):
        self.access_token = access_token

    def short_link(self, long_link):
        url = 'https://api-ssl.bitly.com/v3/shorten?access_token=%s&longUrl=%s&format=json'\
               % (self.access_token, long_link)
        try:
            return json.loads(urllib.request.urlopen(url).read().decode('utf8'))['data']['url']
        except:
            return long_link

В инит метод передается только наш access_token. В случае неудачного получения сокращенной ссылки, метод short_link возвращает переданную ему изначальную ссылку.


4. Управляющий класс

class export_bot:
    def __init__(self):
        config = configparser.ConfigParser()
        config.read('./config')
        log_file = config['Export_params']['log_file']
        self.pub_pause = int(config['Export_params']['pub_pause'])
        self.delay_between_messages = int(config['Export_params']['delay_between_messages'])
        logging.basicConfig(format = u'%(filename)s[LINE:%(lineno)d]# %(levelname)-8s \
                        [%(asctime)s] %(message)s',level =   logging.INFO, filename = u'%s'%log_file)
        self.db = database(config['Database']['Path'])
        self.src = source(config['RSS']['link'])
        self.chat_id = config['Telegram']['chat']
        bot_access_token = config['Telegram']['access_token']
        self.bot = telegram.Bot(token=bot_access_token)
        self.bit_ly = bitly(config['Bitly']['access_token'])

    def detect(self):
        #получаем 30 последних постов из rss-канала
        self.src.refresh()
        news = self.src.news        
        news.reverse()
        #Проверяем на наличие в базе ссылки на новость. Если нет, то  добавляем в базу данных с 
        #отложенной публикацией
        for i in news:
            if not self.db.find_link(i.link):
                now = int(time.mktime(time.localtime()))
                i.publish = now + self.pub_pause
                logging.info( u'Detect news: %s' % i)
                self.db.add_news(i)

    def public_posts(self):
        #Получаем 30 последних записей из rss канала и новости из БД,     у которых message_id=0
        posts_from_db = self.db.get_post_without_message_id()
        self.src.refresh()
        line = [i for i in self.src.news]
        #Выбор пересечний этих списков
        for_publishing = list(set(line) & set(posts_from_db))
        for_publishing.reverse()
        print(for_publishing)
        #Постинг каждого сообщений
        for post in for_publishing:
            text = '%s %s' % (base64.b64decode(post.text).decode('utf8'),\
                             self.bit_ly.short_link(base64.b64decode(post.link).decode('utf-8')))
            a = self.bot.sendMessage(chat_id=self.chat_id, text=text, parse_mode=telegram.ParseMode.HTML)
            message_id = a.message_id
            chat_id = a['chat']['id']
            self.db.update(post.link, chat_id, message_id)
            logging.info( u'Public: %s;%s;' % (post, message_id))
            time.sleep(self.delay_between_messages)

При инциализации с помощью библиотеки configparser считываем наш конфиг-файл и настраиваем логгирование.

Чтобы детектировать новости, используем метод detect. Получаем последние 30 опубликованных постов, поочередно проверяем наличие ссылки в базе данных.

Перед публикацией, необходимо проверить наличие постов, выгруженных из базы данных в rss-канале. В этом нам помогут множества. И после этого уже публикуем новость с помощью библиотеки telegram. Её функционал довольно широк и ориентирован на написание ботов. После публикации необходимо обновить message_id и chat_id.

В итоге получаем:

image

Стоит отметить то, что если переписать класс rss, то так же можно будет импортировать новости из других источников (VK, facebook и т.д.).

Исходники можно найти на Github: https://github.com/Vispiano/rss2telegram

© Habrahabr.ru