Простые запросы SNMP в Python (с помощью pysnmp)

48ec690a924d3d7010d0ae4fe9768db0.png

Этот пост предназначен в первую очередь для сотрудников телекома, админов и новичков в разработке, впервые столкнувшихся с необходимостью отправить snmp-запросы к какому-нибудь коммутатору и разобрать полученный ответ.

Разберем основы работы с библиотекой pysnmp на примере модуля, который принимает в качестве параметров oid-ы, ip и RO-community коммутатора и отдает человекопонятный json с ответами на эти oid-ы и ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и ответ на запрос о типах линков

Для начала импортирум необходимые модули и установим формат логов:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import json
from sys import argv
from pysnmp.entity.rfc3413.oneliner import cmdgen
import logging

logging.basicConfig(
    level=logging.DEBUG, filename="./log.txt",
    format='%(asctime)s %(name)s.%(funcName)s +%(lineno)s: %(levelname)-8s [%(process)d] %(message)s',
)
logger = logging.getLogger("./log.txt")

Формат использования класса поставим следуюший:

class Device:

    def __init__(self, ip, ro, oids, port=161):
        self.ip = ip
        self.ro = ro
        self.oids = oids
        self.port = port
        self.if_oids = ['ifAdminStatus', 'ifOperStatus', 'ifInOctets', 'ifOutOctets']

        
        
    def get_ifwalk(self):
        pass

if __name__ == "__main__":
    name_script, ip, ro, oid = argv
    device = Device(ip, ro, oid)

    print(json.dumps(device.get_ifwalk()))

Непосредственно содержимое функции, выполняющей функционал snmpwalk.

Документация говорит что для одного оида, который можно передать как екстовом виде (ifAdminStatus) так и в числовом (1.3.6.1.2.1.2.2.1.7). Я сталкивалась с тем что ответы на запрос в ручную из CLI и ответы на запрос скрипта могут отличаться, как правило эты проблема решалась поднявшись на один уровень выше в числовом формате oid-а (1.3.6.1.4.1.171.11.113.1.3.2.2.1.1.4 → 1.3.6.1.4.1.171.11.113.1.3.2.2.1.1)

В основном примеры показывают как сделать запрос с одним oid-ом

errorIndication, errorStatus, errorIndex, \
                 varBindTable = cmdgen.CommandGenerator().nextCmd(
    cmdgen.CommunityData('test-agent', 'public'),
    cmdgen.UdpTransportTarget(('localhost', 161)),
    (1,3,6,1,2,1,1)
    )

Когда необходимо одним запросом достать ответ по нескольким oid-ам от одной железки самая устойчивая конструкция будет иметь вот такой вид:

errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
                cmdgen.CommunityData(ro_community, mpModel=1),
                cmdgen.UdpTransportTarget((net_address, 161)),
                ('1.3.6.1.2.1.2.2.1.7'),
                ('1.3.6.1.2.1.2.2.1.8'),
                ('1.3.6.1.2.1.2.2.1.10'),
                ('1.3.6.1.2.1.2.2.1.16')
            )

Но в дальнейшем будем рассматривать пример когда количество oid-ов определяется передаваемым параметром. Пусть if-oids заданы списком, а в передаваемых параметрах будет один oid строкой. Тогда:

    def get_ifwalk(self) -> None :
        """
        Получение ответов коммутатора на ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и переданный медиатайп.
        
        :return: None
        """


        oids_form = [(oid,) for oid in self.if_oids]
        oids_form.extend((self.oid,))

        try:
            cmdGen = cmdgen.CommandGenerator()

            errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
                cmdgen.CommunityData(self.ro, mpModel=1),
                cmdgen.UdpTransportTarget((self.ip, self.port)),
                *oids_form
            )

            if errorIndication:
                raise BaseException(f"errorIndication: {errorIndication}")
            if errorStatus:
                raise BaseException(f"errorStatus: "
                            f"{errorStatus.prettyPrint(), errorIndex and varBindTable[-1][int(errorIndex)-1] or '?'}")
        except BaseException as bex:
            logger.error(bex)
        except Exception as ex:
            logger.error(f"Unexpected error: {ex}")

Таким образом, если железка недоступна или отдала некорректный ответ, мы логируем исключение.

А если все ок — в varBindTable содержится ответ, который предстоит распарсить. И, т.к это список объектов специфичного формата — обращение к их обрабатываемому виду выглядит так:

            for varBindTableRow in varBindTable:
                for name, val in varBindTableRow:
                    print(name.prettyPrint())
                    print(val.prettyPrint())

Зачастую информация в ответе на snmp запрос полученная таким образом содержится не только в значении varBindTableRow, но и в ключе. Как например у длинков моделей DES-1210–28/ME/B2 и DES-1210–28/ME/B3 в ответе на 1.3.6.1.4.1.171.10.75.15.2.1.13.1.4 последняя цифра в ключе varBindTableRow это один из признаков по которому можноо определить оптика или медь выходит из этого порта.

Поэтому я примеду в примере будет париснг ответа с учетом таких признаков.

#если нет ошибок в полученном ответе - записываем все параметры в словарь
            self.result = {}
            re_part = re.compile("(\d\.\d\.\d\.\d\.\d\.\d\.)(?P.*?)$", re.MULTILINE|re.DOTALL)

            part_mt_oid = re_part.search(self.oid).group('part_mt')
            re_mt = re.compile(f'\S+({part_mt_oid})\.(?P\d{1,2})\.(?P\d+)',
                               re.MULTILINE|re.DOTALL)
            re_if = re.compile("\S+\:\:\S+2\.2\.1\.(?P\d+)\.(?P\d{1,2})$",
                               re.MULTILINE|re.DOTALL)

            types_response = {'7': 'ifAdminStatus',
                             '8': 'ifOperStatus',
                             '10': 'ifInOctets',
                             '16': 'ifOutOctets'
                             }
            for varBindTableRow in varBindTable:
                for name, val in varBindTableRow:

                    founds_mt_responce = re_mt.search(name.prettyPrint())
                    if founds_mt_responce is not None:
                        port = founds_mt_responce.group("port")
                        self.result.setdefault('sign', {})[port] = founds_mt_responce.group("sign")
                        self.result.setdefault('link', {})[port] = val.prettyPrint()

                    found_if_responce = re_if.search(name.prettyPrint())
                    if found_if_responce is not None:
                        port = found_if_responce.group('port')
                        type_response = types_response.get(found_if_responce.group('key'))
                        if (type_response in ['ifAdminStatus', 'ifOperStatus']) and (val.prettyPrint() == '1'):
                            status = 'up' if val.prettyPrint() == '1' else 'down'
                            self.result.setdefault(type_response, {})[port] = status
                            continue
                        self.result.setdefault(type_response, {})[port] = val.prettyPrint()

Итого получаем:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import json
from sys import argv
from pysnmp.entity.rfc3413.oneliner import cmdgen
import logging

logging.basicConfig(
    level=logging.DEBUG, filename="./log.txt",
    format='%(asctime)s %(name)s.%(funcName)s +%(lineno)s: %(levelname)-8s [%(process)d] %(message)s',
)
logger = logging.getLogger("./log.txt")


class Device:

    def __init__(self, ipswitch, ro_community, oid_mt, port=161):
        self.ip = ipswitch
        self.ro = ro_community
        self.oid = oid_mt
        self.port = port
        self.if_oids = ['ifAdminStatus', 'ifOperStatus', 'ifInOctets', 'ifOutOctets']
        self.types_response = {'7': 'ifAdminStatus',
                               '8': 'ifOperStatus',
                               '10': 'ifInOctets',
                               '16': 'ifOutOctets'
                               }

        self.re_part = re.compile("(\d\.\d\.\d\.\d\.\d\.\d\.)(?P.*?)$", re.MULTILINE | re.DOTALL)
        self.part_mt_oid = self.re_part.search(self.oid).group('part_mt')
        self.re_mt = re.compile(f'\S+({self.part_mt_oid})\.(?P\d{1, 2})\.(?P\d+)',
                                re.MULTILINE | re.DOTALL)
        self.re_if = re.compile("\S+\:\:\S+2\.2\.1\.(?P\d+)\.(?P\d{1,2})$",
                                re.MULTILINE | re.DOTALL)
        self.result = {}

    def get_ifwalk(self) -> dict:
        """
        Получение ответов коммутатора на ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и переданный медиатайп.
        :return: self.result: dict
        """

        oids_form = [(oid_if,) for oid_if in self.if_oids]
        oids_form.extend((self.oid,))

        try:
            cmdGen = cmdgen.CommandGenerator()

            errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
                cmdgen.CommunityData(self.ro, mpModel=1),
                cmdgen.UdpTransportTarget((self.ip, self.port)),
                *oids_form)

            if errorIndication:
                raise BaseException(f"errorIndication: {errorIndication}")
            if errorStatus:
                raise BaseException(f"errorStatus: "
                                    f"{errorStatus.prettyPrint(), errorIndex and varBindTable[-1][int(errorIndex) - 1] or '?'}")

            # если нет ошибок в полученном ответе - записываем все параметры в словарь
            for varBindTableRow in varBindTable:
                for name, val in varBindTableRow:

                    founds_mt_responce = self.re_mt.search(name.prettyPrint())
                    if founds_mt_responce is not None:
                        port = founds_mt_responce.group("port")
                        self.result.setdefault('sign', {})[port] = founds_mt_responce.group("sign")
                        self.result.setdefault('link', {})[port] = val.prettyPrint()

                    found_if_responce = self.re_if.search(name.prettyPrint())
                    if found_if_responce is not None:
                        port = found_if_responce.group('port')
                        type_response = self.types_response.get(found_if_responce.group('key'))
                        if (type_response in ['ifAdminStatus', 'ifOperStatus']) and (val.prettyPrint() == '1'):
                            status = 'up' if val.prettyPrint() == '1' else 'down'
                            self.result.setdefault(type_response, {})[port] = status
                            continue
                        self.result.setdefault(type_response, {})[port] = val.prettyPrint()


        except BaseException as bex:
            logger.error(bex)
        return self.result


if __name__ == "__main__":
    name_script, ip, ro, oid = argv
    device = Device(ip, ro, oid)
    print(json.dumps(device.get_ifwalk()))

И на всякий случай ссылкой.

© Habrahabr.ru