Простые запросы SNMP в Python (с помощью pysnmp)
Этот пост предназначен в первую очередь для сотрудников телекома, админов и новичков в разработке, впервые столкнувшихся с необходимостью отправить snmp-запросы к какому-нибудь коммутатору и разобрать полученный ответ.
Разберем основы работы с библиотекой pysnmp на примере модуля, который принимает в качестве параметров oid-ы, ip и RO-community коммутатора и отдает человекопонятный json с ответами на эти oid-ы и ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и ответ на запрос о типах линков
Для начала импортирум необходимые модули и установим формат логов:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import json
from sys import argv
from pysnmp.entity.rfc3413.oneliner import cmdgen
import logging
logging.basicConfig(
level=logging.DEBUG, filename="./log.txt",
format='%(asctime)s %(name)s.%(funcName)s +%(lineno)s: %(levelname)-8s [%(process)d] %(message)s',
)
logger = logging.getLogger("./log.txt")
Формат использования класса поставим следуюший:
class Device:
def __init__(self, ip, ro, oids, port=161):
self.ip = ip
self.ro = ro
self.oids = oids
self.port = port
self.if_oids = ['ifAdminStatus', 'ifOperStatus', 'ifInOctets', 'ifOutOctets']
def get_ifwalk(self):
pass
if __name__ == "__main__":
name_script, ip, ro, oid = argv
device = Device(ip, ro, oid)
print(json.dumps(device.get_ifwalk()))
Непосредственно содержимое функции, выполняющей функционал snmpwalk.
Документация говорит что для одного оида, который можно передать как екстовом виде (ifAdminStatus) так и в числовом (1.3.6.1.2.1.2.2.1.7). Я сталкивалась с тем что ответы на запрос в ручную из CLI и ответы на запрос скрипта могут отличаться, как правило эты проблема решалась поднявшись на один уровень выше в числовом формате oid-а (1.3.6.1.4.1.171.11.113.1.3.2.2.1.1.4 → 1.3.6.1.4.1.171.11.113.1.3.2.2.1.1)
В основном примеры показывают как сделать запрос с одним oid-ом
errorIndication, errorStatus, errorIndex, \
varBindTable = cmdgen.CommandGenerator().nextCmd(
cmdgen.CommunityData('test-agent', 'public'),
cmdgen.UdpTransportTarget(('localhost', 161)),
(1,3,6,1,2,1,1)
)
Когда необходимо одним запросом достать ответ по нескольким oid-ам от одной железки самая устойчивая конструкция будет иметь вот такой вид:
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData(ro_community, mpModel=1),
cmdgen.UdpTransportTarget((net_address, 161)),
('1.3.6.1.2.1.2.2.1.7'),
('1.3.6.1.2.1.2.2.1.8'),
('1.3.6.1.2.1.2.2.1.10'),
('1.3.6.1.2.1.2.2.1.16')
)
Но в дальнейшем будем рассматривать пример когда количество oid-ов определяется передаваемым параметром. Пусть if-oids заданы списком, а в передаваемых параметрах будет один oid строкой. Тогда:
def get_ifwalk(self) -> None :
"""
Получение ответов коммутатора на ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и переданный медиатайп.
:return: None
"""
oids_form = [(oid,) for oid in self.if_oids]
oids_form.extend((self.oid,))
try:
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData(self.ro, mpModel=1),
cmdgen.UdpTransportTarget((self.ip, self.port)),
*oids_form
)
if errorIndication:
raise BaseException(f"errorIndication: {errorIndication}")
if errorStatus:
raise BaseException(f"errorStatus: "
f"{errorStatus.prettyPrint(), errorIndex and varBindTable[-1][int(errorIndex)-1] or '?'}")
except BaseException as bex:
logger.error(bex)
except Exception as ex:
logger.error(f"Unexpected error: {ex}")
Таким образом, если железка недоступна или отдала некорректный ответ, мы логируем исключение.
А если все ок — в varBindTable содержится ответ, который предстоит распарсить. И, т.к это список объектов специфичного формата — обращение к их обрабатываемому виду выглядит так:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print(name.prettyPrint())
print(val.prettyPrint())
Зачастую информация в ответе на snmp запрос полученная таким образом содержится не только в значении varBindTableRow, но и в ключе. Как например у длинков моделей DES-1210–28/ME/B2 и DES-1210–28/ME/B3 в ответе на 1.3.6.1.4.1.171.10.75.15.2.1.13.1.4 последняя цифра в ключе varBindTableRow это один из признаков по которому можноо определить оптика или медь выходит из этого порта.
Поэтому я примеду в примере будет париснг ответа с учетом таких признаков.
#если нет ошибок в полученном ответе - записываем все параметры в словарь
self.result = {}
re_part = re.compile("(\d\.\d\.\d\.\d\.\d\.\d\.)(?P.*?)$", re.MULTILINE|re.DOTALL)
part_mt_oid = re_part.search(self.oid).group('part_mt')
re_mt = re.compile(f'\S+({part_mt_oid})\.(?P\d{1,2})\.(?P\d+)',
re.MULTILINE|re.DOTALL)
re_if = re.compile("\S+\:\:\S+2\.2\.1\.(?P\d+)\.(?P\d{1,2})$",
re.MULTILINE|re.DOTALL)
types_response = {'7': 'ifAdminStatus',
'8': 'ifOperStatus',
'10': 'ifInOctets',
'16': 'ifOutOctets'
}
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
founds_mt_responce = re_mt.search(name.prettyPrint())
if founds_mt_responce is not None:
port = founds_mt_responce.group("port")
self.result.setdefault('sign', {})[port] = founds_mt_responce.group("sign")
self.result.setdefault('link', {})[port] = val.prettyPrint()
found_if_responce = re_if.search(name.prettyPrint())
if found_if_responce is not None:
port = found_if_responce.group('port')
type_response = types_response.get(found_if_responce.group('key'))
if (type_response in ['ifAdminStatus', 'ifOperStatus']) and (val.prettyPrint() == '1'):
status = 'up' if val.prettyPrint() == '1' else 'down'
self.result.setdefault(type_response, {})[port] = status
continue
self.result.setdefault(type_response, {})[port] = val.prettyPrint()
Итого получаем:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import json
from sys import argv
from pysnmp.entity.rfc3413.oneliner import cmdgen
import logging
logging.basicConfig(
level=logging.DEBUG, filename="./log.txt",
format='%(asctime)s %(name)s.%(funcName)s +%(lineno)s: %(levelname)-8s [%(process)d] %(message)s',
)
logger = logging.getLogger("./log.txt")
class Device:
def __init__(self, ipswitch, ro_community, oid_mt, port=161):
self.ip = ipswitch
self.ro = ro_community
self.oid = oid_mt
self.port = port
self.if_oids = ['ifAdminStatus', 'ifOperStatus', 'ifInOctets', 'ifOutOctets']
self.types_response = {'7': 'ifAdminStatus',
'8': 'ifOperStatus',
'10': 'ifInOctets',
'16': 'ifOutOctets'
}
self.re_part = re.compile("(\d\.\d\.\d\.\d\.\d\.\d\.)(?P.*?)$", re.MULTILINE | re.DOTALL)
self.part_mt_oid = self.re_part.search(self.oid).group('part_mt')
self.re_mt = re.compile(f'\S+({self.part_mt_oid})\.(?P\d{1, 2})\.(?P\d+)',
re.MULTILINE | re.DOTALL)
self.re_if = re.compile("\S+\:\:\S+2\.2\.1\.(?P\d+)\.(?P\d{1,2})$",
re.MULTILINE | re.DOTALL)
self.result = {}
def get_ifwalk(self) -> dict:
"""
Получение ответов коммутатора на ifAdminStatus, ifOperStatus, ifInOctets, ifOutOctets и переданный медиатайп.
:return: self.result: dict
"""
oids_form = [(oid_if,) for oid_if in self.if_oids]
oids_form.extend((self.oid,))
try:
cmdGen = cmdgen.CommandGenerator()
errorIndication, errorStatus, errorIndex, varBindTable = cmdGen.nextCmd(
cmdgen.CommunityData(self.ro, mpModel=1),
cmdgen.UdpTransportTarget((self.ip, self.port)),
*oids_form)
if errorIndication:
raise BaseException(f"errorIndication: {errorIndication}")
if errorStatus:
raise BaseException(f"errorStatus: "
f"{errorStatus.prettyPrint(), errorIndex and varBindTable[-1][int(errorIndex) - 1] or '?'}")
# если нет ошибок в полученном ответе - записываем все параметры в словарь
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
founds_mt_responce = self.re_mt.search(name.prettyPrint())
if founds_mt_responce is not None:
port = founds_mt_responce.group("port")
self.result.setdefault('sign', {})[port] = founds_mt_responce.group("sign")
self.result.setdefault('link', {})[port] = val.prettyPrint()
found_if_responce = self.re_if.search(name.prettyPrint())
if found_if_responce is not None:
port = found_if_responce.group('port')
type_response = self.types_response.get(found_if_responce.group('key'))
if (type_response in ['ifAdminStatus', 'ifOperStatus']) and (val.prettyPrint() == '1'):
status = 'up' if val.prettyPrint() == '1' else 'down'
self.result.setdefault(type_response, {})[port] = status
continue
self.result.setdefault(type_response, {})[port] = val.prettyPrint()
except BaseException as bex:
logger.error(bex)
return self.result
if __name__ == "__main__":
name_script, ip, ro, oid = argv
device = Device(ip, ro, oid)
print(json.dumps(device.get_ifwalk()))
И на всякий случай ссылкой.