Свой plugin для Nautobot — это совсем не сложно

Если кто-то не в курсе, Nautobot — форк Netbox, его продвигает широко известный в узких кругах провайдер сетевой автоматизации NTC (Network to code). Возможно, порывшись в памяти, вы вспомните не очень красивую историю начала прошлого года про затирание истории коммитов Netbox — дело как раз касалось Nautobot и NTC. Но речь пойдет не о правилах приличия опенсорсного сообщества, а о том, как легко и непринужденно написать свой плагин тому, кто выбрал в качестве источника истины и платформы автоматизации Nautobot, а не Netbox. Хотя я уверен, что реализация плагина, о котором пойдет речь дальше, так же легко может быть выполнена и для Netbox.

Постановка задачи

В нашей не очень большой сети имеется ~50 филиалов, в каждом из которых установлены коммутаторы с общим количество портов от 300 до 500. На самом деле филиалов больше, но начнем мы именно с этого количества, потому что во всех из них установлено оборудование Cisco. Это важно, так как наш плагин будет пока одновендорным.

По каждому порту каждого коммутатора нам необходимо знать:

  1. когда последний раз было подключение к этому порту

  2. устройство с каким MAC/IP адресом и в каком VLAN подключено к этому порту

«Постой-ка, автор» — скажете вы — «такой функционал я уже где-то видел». Совершенно точно, это switchmap (гуглится по словосочетанию cisco switchmap). Что ж, никто не обещал, что в статье будет совершенно новая идея для плагина. Но эта идея очень хорошо ложится в концепцию использования Nautobot как платформы автоматизации и SoT.

Сначала я хотел отдать дань уважения разработчикам switchmap и увековечить их творение в названии своего плагина. Но в итоге остановился на более объясняющем функционал названии — nautobot-porthistory-plugin (Sorry guys)

Стек

Ну тут всё просто. Раз мы используем Nautobot, будем писать на Python и активно использовать модули Django. Никаких внешних скриптов, все внутри Nautobot. Забирать данные с сетевых устройств будем по SNMP. Коммутаторы и маршрутизаторы — Cisco, все заведены в Nautobot, у каждого устройства учитываются primary address и все интерфейсы, для каждого филиала учитывается список префиксов и VLAN.

Что насчет документации?

У Nautobot замечательная документация. Еще бы, ведь она почти полностью скопирована у Netbox. Но все изменения и отличия от Netbox документируются не менее хорошо, за это разработчикам отдельное спасибо. Раздел про разработку плагинов достаточно большой — https://nautobot.readthedocs.io/en/stable/plugins/development/

Но вот только первое прочтение официально туториала не дало ответы на многие вопросы (впрочем, как и последующие). Многие знания пришлось выдергивать из исходного кода самого Nautobot, а также плагинов, коих становится все больше и больше.

Итак, начнем

В соответствии поставленной задаче разделим разработку плагина на две части. В первой части сделаем что полегче — научим плагин определять время последнего использования портов коммутатора, хранить и выводить эту информацию пользователям.

По окончании этого этапа ожидается, что наш плагин выполнит следующее:

  • создаст в БД таблицу для хранения информации о неиспользуемых интерфейсах;

  • создаст job, который будет с нужной нам периодичностью заполнять созданную таблицу;

  • изменит страницу вывода информации о коммутаторе так, чтобы было видно, какие порты не используются больше заданного количества дней.

Пока действуем строго по документации

Создадим в каталоге nautobot-plugin-porthistory скелет плагина на сервере (работаем под пользователем nautobot).

Nautobot includes a command to help create the plugin directory: nautobot-server startplugin [app_name]

Команда nautobot-server startplugin nautobot_porthistory_plugin создаст следующий набор папок и файлов:

.
└── nautobot_porthistory_plugin
    ├── __init__.py
    ├── migrations
    │   └── __init__.py
    ├── models.py
    ├── navigation.py
    ├── tests
    │   ├── __init__.py
    │   ├── test_models.py
    │   └── test_views.py
    ├── urls.py
    └── views.py

Ну что, файлы есть, давайте наполним их работающим кодом.

Файл пакета  __init__.py уже содержит большинство нужных параметров. Всё, что нам требуется, указать какие опции конфига обязательны для заполнения, а также настройки плагина по умолчанию.

__init__.py
"""nautobot_porthistory_plugin Plugin Initilization."""

from nautobot.extras.plugins import PluginConfig

class NautobotPorthistoryPluginConfig(PluginConfig):
    """Plugin configuration for the nautobot_porthistory_plugin plugin."""

    name = "nautobot_porthistory_plugin"  # Raw plugin name; same as the plugin's source directory.
    verbose_name = "nautobot_porthistory_plugin"  # Human-friendly name for the plugin.
    base_url = "nautobot_porthistory_plugin"  # (Optional) Base path to use for plugin URLs. Defaulting to app_name.
    required_settings = []  # A list of any configuration parameters that must be defined by the user.
    min_version = "1.0.0"  # Minimum version of Nautobot with which the plugin is compatible.
    max_version = "1.999"  # Maximum version of Nautobot with which the plugin is compatible.
    default_settings = {}  # A dictionary of configuration parameters and their default values.
    caching_config = {}  # Plugin-specific cache configuration.

    # А вот и нужная нам конфигурация
    required_settings = ['switches_role_slug']
    default_settings = {
        'min_idle_days': 14,
        'snmp_community': 'public',
        'workers': 50,
    }

config = NautobotPorthistoryPluginConfig

Параметры, которые будем передавать в плагин:

  • switches_role_slug — Роль, по которой будем фильтровать коммутаторы

  • min_idle_days — Если порт не используется меньше дней, он нас не интересует

  • workers — количество параллельных асинхронных запросов к оборудованию

Модель определим в models.py. В таблице UnusedPorts будем хранить время обновления, время последнего использования интерфейса и собственно ID самого интерфейса. Как видите, пока вообще ничего сложного.

models.py
"""Model definition for nautobot_porthistory_plugin."""

from django.db import models

from nautobot.core.models import BaseModel
from nautobot.dcim.fields import MACAddressCharField

class UnusedPorts(BaseModel):
    # Дата/время последнего output на порту коммутатора 

    updated = models.DateTimeField(auto_now=True)
    last_output = models.DateTimeField()
    interface = models.ForeignKey(
        to="dcim.Interface",
        on_delete=models.CASCADE,
        blank=False,
    )
    
    def __str__(self):
        return f'{self.interface.name} - {self.last_output}'

Аргумент auto_now=Trueуказывает, что при каждом сохранении данных в таблицу поле updated автоматически обновляется до текущего времени

Файлы navigation.py, urls.py и views.py пока нам не нужны, оставим их в первозданном виде. Эти три файла понадобятся на следующем этапе, когда мы сделаем отдельную ссылку на плагин в меню. Но, как обычно, есть нюанс. Импорты в views.py «из коробки» ссылаются на несуществующие модули django, поэтому чтобы не словить исключение при установке, закомментим строчку from django.views.generic import views

Итак, у нас в БД есть таблица, но в ней нет данных. Создадим джоб (job в Nautobot — как custom script и report в Netbox), который соберет информацию с коммутаторов, обработает ее и сохранит в нашей таблице.

Добавим файл jobs.py с классом (джобом) UnusedPortsUpdate

Логика его работы проста:

  1. Сгенерировать список коммутаторов — используем фильтр по device_role.slug = switches_role_slug из конфига

  2. Асинхронно опросить все коммутаторы по SNMP — uptime устройства

  3. Асинхронно опросить все коммутаторы по SNMP — соответствие ifindex и названий портов

  4. Асинхронно опросить все коммутаторы по SNMP — Last output на портах

  5. Если Last output отрицательный смотрим, есть ли в таблице информация по данному порту. Если отсутствует — добавляем дату последнего использования равной дате загрузке (п.2). Если данные есть, просто обновляем дату проверки

  6. Если Last output, переведенный в дни,   меньше, чем min_idle_days из конфига, удаляем запись о порте из таблицы. Иначе создаем/обновляем запись.

На всякий случай в необязательные входные параметры добавим Site, чтобы иметь возможность обновлять информацию по конкретному филиалу.

Код jobs.py

jobs.py

from nautobot.dcim.models import Device, DeviceRole, Site, Interface
from nautobot.extras.jobs import Job, ObjectVar
from nautobot.extras.models import Status
from django.conf import settings

from nautobot_porthistory_plugin.models import UnusedPorts

import asyncio
import aiosnmp

from collections import defaultdict
from netutils.interface import canonical_interface_name
from datetime import datetime, timedelta

class UnusedPortsUpdate(Job):

    class Meta:
        name = "Обновление информации о неподключенных интерфейсах"

    site = ObjectVar(
        model=Site,
        label='БЮ',
        required=False
    )

    async def bulk_snmp(self, device, oid_list, community):
        oid_results = {}
        try:
            async with aiosnmp.Snmp(
                host=device,
                port=161,
                community=community,
                timeout=5,
                retries=3,
                max_repetitions=10,
            ) as snmp:
                oid_bulk_result = {}
                for oid in oid_list:
                    reply = await snmp.bulk_walk(oid)
                    for index in reply:
                        oid_bulk_result[index.oid] = index.value
                    oid_results[oid] = oid_bulk_result

                return (device, oid_results)

        except Exception as error:
            return (device, error)
        return (device, None)

    async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
        async with semaphore:
            return await function(*args, **kwargs)

    async def async_bulk_snmp(self, devices, oid_list, community, workers):
        semaphore = asyncio.Semaphore(workers)
        coroutines = [
            self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
            for device in devices
        ]
        result = []
        for future in asyncio.as_completed(coroutines):
            result.append(await future)
        return result

    def round_datetime(self, date):
        date_tuple = date.timetuple()
        return datetime(year=date_tuple.tm_year,
                        month=date_tuple.tm_mon,
                        day=date_tuple.tm_mon,
                        hour=date_tuple.tm_hour, minute=0, second=0, microsecond=0
                        )

    def run(self, data, commit):
        # запускать job могут только пользователи is_superuser
        if not self.request.user.is_superuser:
            self.log_info(message='Неавторизованный запуск')
            return

        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        COMMUNITY = PLUGIN_CFG['snmp_community']
        MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        WORKERS = PLUGIN_CFG['workers']
        STATUS_ACTIVE = Status.objects.get(slug='active')

        # сгенерируем справочник устройств
        devices = [] #этот список передадим в модуль snmp
        device_dict = defaultdict(dict)
        device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
        if data['site']:
            nb_devices = Device.objects.filter(site=data['site'], device_role__in=device_role, status=STATUS_ACTIVE)
        else:
            nb_devices = Device.objects.filter(device_role__in=device_role, status=STATUS_ACTIVE)
            
        for nb_device in nb_devices:
            if nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4:
                primary_ip = str(nb_device.primary_ip4).split('/')[0]
                devices.append(primary_ip)
                device_dict[primary_ip]['device'] = nb_device
                device_dict[primary_ip]['interfaces'] = {}
                device_dict[primary_ip]['ifindexes'] = {}
                device_interfaces = Interface.objects.filter(device_id=nb_device)
                for intf in device_interfaces:
                    device_dict[primary_ip]['interfaces'][intf.name] = [intf]

        # получим uptime оборудования по SNMP (в секундах)
        # и занесем эту информацию в справочник
        oid_list = ['.1.3.6.1.6.3.10.2.1.3']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict:
                self.log_warning(obj=device_dict[device_ip]['device'],message=f'не удалось получить информацию по SNMP - {device_result}')
                continue
            for oid, oid_result in device_result.items():
                for uptime in oid_result.values():
                    device_dict[device_ip]['uptime'] = uptime
                    boottime = datetime.now() - timedelta(seconds=uptime)
                    device_dict[device_ip]['boottime'] = boottime
    
        # получим названия интерфейсов и их индексы с оборудования по SNMP
        # и занесем эту информацию в справочник
        oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    ifindex = index.split('.')[-1]
                    canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
                    if canonical_intf_name in device_dict[device_ip]['interfaces']:
                        device_dict[device_ip]['ifindexes'][ifindex] = canonical_intf_name

        # получим время последнего output по SNMP
        oid_list = ['.1.3.6.1.4.1.9.2.2.1.1.4']
        results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
        output = ''
        for device_ip, device_result in results:
            if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
                continue
            nb_device = device_dict[device_ip]['device']
            boottime = device_dict[device_ip]['boottime']
            uptime = device_dict[device_ip]['uptime']
            output += f'{nb_device.name} - power on {boottime}\n'
            unused_port_count = 0
            for oid, oid_result in device_result.items():
                for index, time_from_last_output in oid_result.items():
                    ifindex = index.split('.')[-1]
                    if ifindex in device_dict[device_ip]['ifindexes']:
                        intf_name = device_dict[device_ip]['ifindexes'][ifindex]
                        nb_interface = device_dict[device_ip]['interfaces'][intf_name][0]
                        if time_from_last_output < 0 or time_from_last_output / 1000 > uptime - 300:
                            unused_port_count += 1
                            unused_port, created = UnusedPorts.objects.get_or_create(
                                interface=nb_interface,
                                defaults={
                                    'last_output': boottime
                                }
                            )
                            unused_port.save()
                        else:
                            last_output = datetime.now() - timedelta(seconds=round(time_from_last_output/1000))
                            if 1000 * 60 * 60 * 24 * MIN_IDLE_DAYS > time_from_last_output:
                                # прошло меньше MIN_IDLE_DAYS дней
                                UnusedPorts.objects.filter(interface=nb_interface).delete()
                            else:
                                unused_port_count += 1
                                unused_port, created = UnusedPorts.objects.get_or_create(
                                    interface=nb_interface,
                                    defaults={
                                        'last_output': last_output
                                    }
                                )
                                if not created:
                                    unused_port.last_output = last_output
                                    unused_port.save()
            output += f'неиспользуемых в течении {MIN_IDLE_DAYS} дн. портов - {unused_port_count}\n'

        return output

jobs = [UnusedPortsUpdate]

В дальнейшем можно будет запланировать регулярный запуск джоба, чтобы всегда иметь актуальный список неиспользуемых портов. Напомню, что в Nautobot для этого есть встроенный шедулер джобов. Всего лишь надо при запуске джоба указать периодичность.

Теперь научим плагин выводить полученные данные на странице коммутатора. Для этого добавим в корень плагина файлы template_content.py и templates/unused_ports.html

Названия говорят сами за себя — подготовить содержимое шаблона и отрендерить блок HTML. Отрендеренный блок разместим на странице Device, в правой части.

template_content.py

from nautobot.extras.plugins import PluginTemplateExtension
from django.conf import settings

from .models import UnusedPorts

class DeviceUnusedPorts(PluginTemplateExtension):
    """Template extension to display unused ports on the right side of the page."""

    model = 'dcim.device'

    def right_page(self):
        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
        device = self.context['object']

        if device.device_role.slug in SWITCHES_ROLE_SLUG:
            device_intefaces = device.interfaces.values_list("id", flat=True)
            unused_ports = UnusedPorts.objects.filter(interface_id__in=device_intefaces)
            unused_ports_with_delta = []
            for port in unused_ports:
                unused_ports_with_delta.append({
                    'interface_name': port.interface.name,
                    'last_output': port.last_output.strftime("%d.%m.%Y %H:%M"),
                    'updated': port.updated.strftime("%d.%m.%Y %H:%M"),
                    'delta': str(port.updated - port.last_output).split()[0]
                })
            return self.render('unused_ ports.html', extra_context={
                'unused_ports': unused_ports_with_delta,
                'min_idle_days': MIN_IDLE_DAYS
            })
        else:
            return ''

template_extensions = [DeviceUnusedPorts]
templates/unused_ports.html

Неиспользуемые порты (в течение последних {{ min_idle_days }} дн.)
{% for item in unused_ports %} {% endfor %}
Порт Время последней активности Обновлено (UTC)
{{ item.interface_name }} {{ item.last_output }} (прошло ~{{ item.delta }} дн.) {{ item.updated }}

Осталось смешать все ингредиенты и попробовать, что получилось:

  1. Создадим миграции — стандартный механизм Django для обеспечения версионности БД в обертке Nautobot.
    nautobot-server makemigrations (выполняется из папки плагина)

  2. добавим файлы setup.py и MANIFEST.in

  3. установим плагин простым pip3 install . из каталога плагина

  4. включим плагин в конфигурационном файле nautobot и там же укажем параметры плагина
    PLUGINS = [
    'nautobot_porthistory_plugin',
    ]
    PLUGINS_CONFIG = {
    'nautobot_porthistory_plugin': {
    'switches_role_slug': ['Access-switch'],
    'min_idle_days': 14,
    }
    }

  5. запустим пост-инсталляционный скрипт nautobot-server post_upgrade. Он обновит БД и скопирует HTML файлы в соответствующую папку.

  6. Перезапустим сервисы: sudo systemctl restart nautobot nautobot-worker

Готово! Можно запустить джоб и проверить результаты его выполнения:

Неиспользуемые порты коммутатораНеиспользуемые порты коммутатора

Полный код плагина, созданный на этом этапе, можно подсмотреть по ссылке — https://github.com/iontzev/nautobot-porthistory-plugin/tree/part_1

Этап второй — прокачка плагина

Вторая часть реализации будет посложнее. Тем интереснее, ведь это позволит понять, что еще могут плагины в Nautobot. Сперва добавим еще одну модель в БД — таблицу, в которой будем хранить MAC и IP адреса с привязкой к интерфейсу. Способ уже знакомый — обновить models.py и создать новую миграцию nautobot-server makemigrations

from nautobot.dcim.fields import MACAddressCharField

class MAConPorts(BaseModel):
    # MAC и IP на порту коммутатора 

    updated = models.DateTimeField(auto_now=True)
    mac = MACAddressCharField(blank=False, verbose_name="MAC Address")
    vlan = models.ForeignKey(
        to="ipam.VLAN",
        on_delete=models.CASCADE,
        blank=False,
    )
    ipaddress = models.ForeignKey(
        to="ipam.IPAddress",
        on_delete=models.SET_NULL,
        default=None,
        blank=True,
        null=True,
    )
    interface = models.ForeignKey(
        to="dcim.Interface",
        on_delete=models.CASCADE,
        blank=False,
    )
    device = models.ForeignKey(
        to="dcim.Device",
        on_delete=models.CASCADE,
        blank=False,
    )
    
    def __str__(self):
        return f'{self.intervace} - VLAN {seld.vlan.vid} MAC {self.mac}'

    class Meta:
        verbose_name_plural = 'MAC and IP on switches ports'

В jobs.py добавим еще один класс — он же джоб. Алгоритм его работы относительно несложный:

  • асинхронно опросить по SNMP все устройства с ролью device_role.slug = switches_role_slug из конфига — получить mac address table и соответствия MAC именам портов.

  • асинхронно опросить по SNMP все устройства с ролью device_role.slug = routers_role_slug из конфига — получить ARP таблицу. Не забудем, что конфиг тоже надо подправить — он находится в __init__.py

  • попробовать получить hostname по IP

  • если IP адрес не внесен в Nautobot, добавить

  • обновить информацию о привязке MAC и IP к интерфейсам устройств

Обновление построено таким образом, что если MAC на порту пропадет физически, мы сможем его найти в истории подключений, если на этом порту коммутатора больше ничего не появлялось.

Новый джоб в jobs.py

class MAConPortsUpdate(Job):

    class Meta:
        name = "Обновление информации о подключенных устройствах"

    site = ObjectVar(
        model=Site,
        label='БЮ',
        required=False
    )

    async def bulk_snmp(self, device, oid_list, community):
        oid_results = {}
        try:
            async with aiosnmp.Snmp(
                host=device,
                port=161,
                community=community,
                timeout=5,
                retries=3,
                max_repetitions=10,
            ) as snmp:
                oid_bulk_result = {}
                for oid in oid_list:
                    reply = await snmp.bulk_walk(oid)
                    for index in reply:
                        oid_bulk_result[index.oid] = index.value
                    oid_results[oid] = oid_bulk_result

                return (device, oid_results)

        except Exception as error:
            return (device, error)
        return (device, None)

    async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
        async with semaphore:
            return await function(*args, **kwargs)

    async def async_bulk_snmp(self, devices, oid_list, community, workers):
        semaphore = asyncio.Semaphore(workers)
        coroutines = [
            self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
            for device in devices
        ]
        result = []
        for future in asyncio.as_completed(coroutines):
            result.append(await future)
        return result

    def run(self, data, commit):
        # запускать job могут только пользователи is_superuser
        if not self.request.user.is_superuser:
            self.log_info(message='Неавторизованный запуск')
            return

        PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
        COMMUNITY = PLUGIN_CFG['snmp_community']
        SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
        ROUTERS_ROLE_SLUG = PLUGIN_CFG['routers_role_slug']
        WORKERS = PLUGIN_CFG['workers']
        STATUS_ACTIVE = Status.objects.get(slug='active')
        STATUS_STATIC = Status.objects.get(slug='static')
        STATUS_DHCP = Status.objects.get(slug='dhcp')

        device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)

        devices = defaultdict(dict)
        devices_list = []
        vlans = defaultdict(list)

        # построим список всех связей, чтобы потом исключить из результатов линки между свичами
        cable_set = defaultdict(set)
        all_cables = Cable.objects.all()
        for cable in all_cables:
            if cable.termination_a_type == ContentType.objects.get(app_label='dcim', model='interface'):
                if not data['site'] or cable.termination_a.device.site == data['site']:
                    cable_set[cable.termination_a.device.name].add(cable.termination_a.name)
            if cable.termination_b_type == ContentType.objects.get(app_label='dcim', model='interface'):
                if not data['site'] or cable.termination_b.device.site == data['site']:
                    cable_set[cable.termination_b.device.name].add(cable.termination_b.name)

        # сгенерируем справочник вланов с разбивкой по сайтам
        vlans_by_site = defaultdict(list)
        if data['site']:
            nb_vlans = VLAN.objects.filter(site=data['site'], status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
        else:
            nb_vlans = VLAN.objects.filter(status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
        for nb_vlan in nb_vlans:
            vlans_by_site[nb_vlan.site.name].append(nb_vlan.vid)

        # сгенерируем справочник устройств
        for site in vlans_by_site:
            site_id = Site.objects.get(name=site)
            nb_devices_in_site = Device.objects.filter(
                site=site_id, 
                device_role__in=device_role, 
                status=STATUS_ACTIVE,
            )
            for nb_device in nb_devices_in_site:
                if (nb_device.platform and 
                            nb_device.platform.napalm_driver and 
                            nb_device.platform.napalm_driver == 'cisco_iosxe' and 
                            nb_device.primary_ip4):

                    primary_ip = str(nb_device.primary_ip4).split('/')[0]
                    devices_list.append(primary_ip)
                    device = devices[primary_ip] = {}
                    device['device'] = nb_device
                    device['site'] = nb_device.site
                    device['interfaces'] = {}
                    device['ifindexes'] = {}
                    device['bridge_ports'] = {}
                    device['vlans'] = vlans_by_site[site]
                    for intf in Interface.objects.filter(device_id=nb_device):
                        device['interfaces'][intf.name] = intf
                    for vlan in vlans_by_site[site]:
                        vlans[vlan].append(primary_ip)

        # получим названия интерфейсов и их индексы с оборудования по SNMP
        oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
        results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            if type(device_result) != dict:
                self.log_warning(obj=devices[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
                del devices[device_ip]
                devices_list.remove(device_ip)
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    ifindex = index.split('.')[-1]
                    canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
                    if canonical_intf_name in devices[device_ip]['interfaces']:
                        devices[device_ip]['ifindexes'][ifindex] = canonical_intf_name

        # пройдемся по списку вланов и получим с устройства таблицу MAC адресов для каждого влана
        # MAC адреса в десятичном формате

        port_mac_relation = defaultdict(list)

        for vlan, devices_dict in vlans.items():
            self.log_info(message=f'Получаем информацию по VLAN {vlan}')
            community_with_vlan = f'{COMMUNITY}@{vlan}'
            devices_list = [device for device in devices_dict if device in devices_list]

            # получим bridge ports с оборудования по SNMP (зависит от VLAN)
            oid_list = ['.1.3.6.1.2.1.17.1.4.1.2']
            results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
            for device_ip, device_result in results:
                if type(device_result) != dict:
                    # скорее всего, такого VLAN нет на этом устройстве
                    continue
                for oid, oid_result in device_result.items():
                    for index, index_result in oid_result.items():
                        bridge_port = index.split('.')[-1]
                        ifindex = str(index_result)
                        if ifindex in devices[device_ip]['ifindexes']:
                            ifname = devices[device_ip]['ifindexes'][ifindex]
                            nb_interface = devices[device_ip]['interfaces'][ifname]
                            devices[device_ip]['bridge_ports'][bridge_port] = nb_interface
            else:
                oid_list = ['.1.3.6.1.2.1.17.4.3.1.2']

                results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
                for device_ip, device_result in results:
                    nb_device = devices[device_ip]['device']
                    nb_vlan = VLAN.objects.get(vid=vlan, site_id=nb_device.site.id)
                    if type(device_result) != dict:
                        continue
                    for oid, oid_result in device_result.items():
                        for mac_dec, bridge_port in oid_result.items():
                            if str(bridge_port) in devices[device_ip]['bridge_ports']:
                                if (devices[device_ip]['bridge_ports'][str(bridge_port)].name not in cable_set[nb_device.name]
                                        and not devices[device_ip]['bridge_ports'][str(bridge_port)]._custom_field_data.get('flag-ignore-mac')):
                                    # преобразуем MAC из десятичного формата в шестнадцатеричный
                                    mac_hex = ''.join(['{0:x}'.format(int(i)).zfill(2) for i in mac_dec.split('.')[-6:]]).upper()
                                    port_mac_relation[devices[device_ip]['bridge_ports'][str(bridge_port)].id].append({
                                        'vlan': nb_vlan,
                                        'mac': mac_hex,
                                        })

        # подготовим список L3 устройств
        routers = defaultdict(dict)
        routers_list = []
        device_role = DeviceRole.objects.filter(slug__in=ROUTERS_ROLE_SLUG)
        for site in vlans_by_site:
            site_id = Site.objects.get(name=site)
            nb_devices_in_site = Device.objects.filter(
                site=site_id, 
                device_role__in=device_role, 
                status=STATUS_ACTIVE,
            )
            for nb_device in nb_devices_in_site:
                if (nb_device.platform and 
                            nb_device.platform.napalm_driver and 
                            nb_device.platform.napalm_driver == 'cisco_iosxe' and 
                            nb_device.primary_ip4):

                    primary_ip = str(nb_device.primary_ip4).split('/')[0]
                    routers_list.append(primary_ip)
                    router = routers[primary_ip] = {}
                    router['site'] = nb_device.site.name
                    router['device'] = nb_device

        arp = defaultdict(dict)
        # получим ARP-таблицу с оборудования по SNMP
        oid_list = ['.1.3.6.1.2.1.3.1.1.2']
        results = asyncio.run(self.async_bulk_snmp(routers_list, oid_list, COMMUNITY, WORKERS))
        for device_ip, device_result in results:
            site = routers[device_ip]['site']
            arp[site] = defaultdict(list)
            if type(device_result) != dict:
                self.log_warning(obj=routers[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
                continue
            for oid, oid_result in device_result.items():
                for index, index_result in oid_result.items():
                    snmp_address = '.'.join(index.split('.')[-4:])
                    snmp_mac = ''.join(["{0:x}".format(int(i)).zfill(2) for i in index_result]).upper()
                    arp[site][snmp_mac].append(snmp_address)

        output = ''

        for device in devices.values():
            nb_device = device['device']
            site = nb_device.site.name
            output += f'device {nb_device} :'
            mac_on_device = ip_on_device = name_on_device = 0
            for intf in device['interfaces'].values():
                if len(port_mac_relation[intf.id]) > 0:
                    MAConPorts.objects.filter(interface=intf).delete()
                for vlan_and_mac in port_mac_relation[intf.id]:
                    mac_on_device += 1
                    nb_prefixes = Prefix.objects.filter(vlan_id=vlan_and_mac['vlan'].id)
                    addresses = arp[site].get(vlan_and_mac['mac'])
                    address_with_prefix = ''
                    if nb_prefixes and addresses:
                        for nb_prefix in nb_prefixes:
                            for address in addresses:
                                if IPv4Address(address) in IPv4Network(str(nb_prefix)):
                                    prefixlen = str(nb_prefix).split('/')[-1]
                                    address_with_prefix = f'{address}/{prefixlen}'
                                    break
                            else:
                                continue
                            break
                    if address_with_prefix:
                        ip_on_device += 1
                        try:
                            hostname, aliaslist, ipaddrlist  = socket.gethostbyaddr(address)
                            name_on_device += 1
                        except:
                            hostname=''
                        nb_address, created = IPAddress.objects.get_or_create(
                            address=address_with_prefix,
                            vrf=nb_prefix.vrf,
                            defaults={
                                'status': STATUS_STATIC,
                                'dns_name': hostname
                            }
                        )
                        if created:
                            self.log_success(obj=nb_address, message=f'Добавлен IP адрес {hostname}')
                        elif nb_address.status != STATUS_DHCP and hostname and nb_address.dns_name != hostname:
                            old_hostname = nb_address.dns_name
                            nb_address.dns_name = hostname
                            nb_address.save()
                            self.log_success(obj=nb_address, message=f'Обновлено DNS name "{old_hostname}" -> "{hostname}"')
                    else:
                        nb_address = None
                    mac, created = MAConPorts.objects.get_or_create(
                        vlan=vlan_and_mac['vlan'],
                        mac=vlan_and_mac['mac'],
                        defaults={
                            'interface': intf,
                            'device': nb_device,
                            'ipaddress': nb_address,
                        }
                    )
                    if not created:
                        updated = False
                        if nb_address and mac.ipaddress != nb_address:
                            self.log_info(obj=nb_address, message=f'Устройство с MAC {mac.mac} поменяло IP {mac.ipaddress} -> {nb_address}')
                            mac.ipaddress = nb_address
                            updated = True
                        if mac.interface != intf:
                            self.log_info(obj=intf, message=f'MAC {mac.mac} переехал с порта "{mac.interface}"')
                            mac.interface = intf
                            mac.device = nb_device
                            updated = True
                        if updated:
                            mac.save()

            output += f" MAC count - {mac_on_device}, IP count - {ip_on_device}, resolved to hostname - {name_on_device}\n"

        return output

Важно! Особенность получения по SNMP таблицы MAC с коммутаторов Cisco в том, что необходимо указывать в строке community номер VLAN, в котором живут MAC адреса. 

Еще учтем, что не про все VLAN надо знать. Например, если в каком-то VLAN расположены беспроводные клиенты, в них бессмысленно отслеживать MAC. То же самое касается портов. Значит нам нужен механизм фильтрации VLAN и портов коммутаторов. Сюда отлично подойдут custom_fields, но использовать в скриптах названия полей, которые надо добавить пользователю, значит надеяться на маленькое чудо.

Так давайте будем реалистами, добавим custom_fileds при установке плагина. Поможет нам в этом функционал Django signals, опять же в обертке Nautobot.

Добавим вызов сигнала в __init__.py, а сам сигнал в signals.py.

__init__.py
class NautobotPorthistoryPluginConfig(PluginConfig):
  ............
		def ready(self):
				super().ready()
    		nautobot_database_ready.connect(create_custom_fields_for_porthistory, sender=self)
signals.py
from nautobot.extras.choices import CustomFieldTypeChoices

def create_custom_fields_for_porthistory(sender, apps, **kwargs):
    """Create a custom field flag_porthistory for VLAN if it doesn't already exist."""
    # Use apps.get_model to look up Nautobot core models
    ContentType = apps.get_model("contenttypes", "ContentType")
    CustomField = apps.get_model("extras", "CustomField")
    VLAN = apps.get_model("ipam", "VLAN")
    Interface = apps.get_model("dcim", "Interface")

    # Create custom fields
    cf_for_vlan, created = CustomField.objects.update_or_create(
        name="flag-porthistory",
        defaults={
            "label": "Search MACs on ports in this VLAN",
            "type": CustomFieldTypeChoices.TYPE_BOOLEAN,
        },
    )
    cf_for_vlan.content_types.set([ContentType.objects.get_for_model(VLAN)])
    cf_for_interface, created = CustomField.objects.update_or_create(
        name="flag-ignore-mac",
        defaults={
            "label": "Ignore MACs on this port",
            "type": CustomFieldTypeChoices.TYPE_BOOLEAN,
        },
    )
    cf_for_interface.content_types.set([ContentType.objects.get_for_model(Interface)])

Теперь при установке плагина автоматически добавятся новые поля к моделям VLAN и Device.interface. Отметив на странице редактирования VLAN галочку Search MACs on ports in this VLAN мы тем самым разрешаем нашему джобу искать MAC адреса в этом VLAN.

Итак, у нас есть джоб и настроенные VLAN, а значит можно заполнять БД нужными данными. Но как посмотреть эти данные? Первый путь нам уже знаком — используем шаблоны для страниц интерфейса и IP-адреса. Таким образом можно точечно извлекать информацию из БД и показывать пользователю:

MAC адреса на конкретном интерфейсе коммутатораMAC адреса на конкретном интерфейсе коммутатора

Второй путь — создать отдельную страницу плагина с возможностью вывода и поиска полученных данных. Этот путь более тернист, чем первый, но и результаты более интересны.

В подготовке страницы плагина участвуют 6 составляющих. Число их можно сократить, если использовать HTML-шаблоны, но это немного другая история

Итак, по порядку:

  1. navigation.py — добавляет строку в выпадающее меню Plugins

  2. urls.py — вешает на url (добавленной строки в п.1) обработчик

  3. views.py — содержит обработчики (View). Обработчик собирает в одно целое данные, параметры вывода (таблицы), форму и алгоритмы фильтрации

  4. tables.py — выводит таблицу из БД

  5. forms.py — формирует HTML форму для фильтрации данных в таблице

  6. filters.py — содержит алгоритмы фильтрации данных

navigation.py
from nautobot.extras.plugins import PluginMenuItem

menu_items = (
    PluginMenuItem(
        link = 'plugins:nautobot_porthistory_plugin:history',  # A reverse compatible link to follow.
        link_text = 'MAC and IP on switches ports',  # Text to display to user.
    ),
)

urls.py
from django.urls import path

from nautobot_porthistory_plugin import views

urlpatterns = [
    path('history/', views.PortHistoryView.as_view(), name='history'),
]
views.py
from django.shortcuts import render
from nautobot.core.views import generic

from nautobot_porthistory_plugin import models, tables, filters, forms

class PortHistoryView(generic.ObjectListView):
    """Показывает MAC и IP адреса на портах"""

    queryset = models.MAConPorts.objects.all()
    table = tables.PortHistoryTable
    filterset = filters.PortHistoryFilterSet
    filterset_form = forms.PortHistoryFilterForm
    action_buttons = ()
tables.py
import django_tables2 as tables
from django_tables2.utils import A
from nautobot.utilities.tables import BaseTable, ToggleColumn

from nautobot_porthistory_plugin import models

class PortHistoryTable(BaseTable):
    pk = ToggleColumn()
    device = tables.Column(linkify=True)
    interface = tables.LinkColumn(orderable=False)
    vlan = tables.LinkColumn()
    ipaddress = tables.Column(linkify=True, verbose_name="IPv4 Address")

    class Meta(BaseTable.Meta):  # pylint: disable=too-few-public-methods
        """Meta attributes."""

        model = models.MAConPorts
        fields = (
            'pk',
            'device',
            'interface',
            'vlan',
            'mac',
            'ipaddress',
            'updated',
        )
forms.py
from django import forms

from nautobot.dcim.models import Region, Site, Device
from nautobot.ipam.models import VLAN
from nautobot.utilities.forms import BootstrapMixin, DynamicModelMultipleChoiceField
from nautobot.extras.forms import CustomFieldFilterForm

from nautobot_porthistory_plugin.models import MAConPorts

class PortHistoryFilterForm(BootstrapMixin, forms.Form):
    """Filter form to filter searches for MAC."""

    model = MAConPorts
    field_order = ["q", "site", "device_id", "vlan"]
    q = forms.CharField(required=False, label="Search MAC")
    site = DynamicModelMultipleChoiceField(
        queryset=Site.objects.all(),
        to_field_name="slug",
        required=False,
    )
    device_id = DynamicModelMultipleChoiceField(
        queryset=Device.objects.all(),
        required=False,
        label="Device",
        query_params={"site": "$site"},
    )
    vlan = DynamicModelMultipleChoiceField(
        queryset=VLAN.objects.all(),
        required=False,
        label="VLAN",
        query_params={"site": "$site"},
    )
filters.py
import django_filters
from nautobot.dcim.models import Device
from nautobot.utilities.filters import BaseFilterSet, MultiValueCharFilter
from django.db.models import Q

from nautobot_porthistory_plugin.models import MAConPorts

class PortHistoryFilterSet(BaseFilterSet):
    """Filter for MAConPorts"""

    q = django_filters.CharFilter(method="search", label="Search MAC")

    site = MultiValueCharFilter(
        method="filter_site",
        field_name="pk",
        label="site",
    )
    device_id = MultiValueCharFilter(
        method="filter_device_id",
        field_name="pk",
        label="Device (ID)",
    )
    vlan = MultiValueCharFilter(
        method="filter_vlan",
        field_name="pk",
        label="VLAN",
    )

    class Meta:
        """Meta attributes for filter."""

        model = MAConPorts

        fields = [
            'vlan'
        ]

    def search(self, queryset, mac, value):
        if not value.strip():
            return queryset
        mac = ''.join(ch for ch in value if ch.isalnum())
        mac = ':'.join(mac[i:i+2] for i in range(0,len(mac),2))
        return queryset.filter(Q(mac__icontains=mac))

    def filter_site(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(device__site__slug__in=id_list) )

    def filter_device_id(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(device__id__in=id_list) )

    def filter_vlan(self, queryset, name, id_list):
        if not id_list:
            return queryset
        return queryset.filter(Q(vlan__id__in=id_list) )

Вот такая получилась страница плагина. Можно фильтровать по сайту, по устройству, по VLAN. Можно искать по части MAC, причем формат ввода в поле поиска абсолютно не важен, хоть через нижние подчеркивания, хоть капсом через одну.

2a0788e2b25a315dc7447bd5ba1421a5.png

Какие выводы я для себя сделал

Написать свой плагин не так и сложно, как мне казалось в самом начале, когда я только изучал тему Netbox и Nautobot. Nautobot/Netbox представляют собой отличную платформу для автоматизации. Прикладывая минимум усилий можно достичь отличных результатов в деле адаптации Nautobot/Netbox к реалиям своей организации.

На этом всё, спасибо за внимание.

Ну и для тех, кому эта тема действительно интересна и кто дочитал до конца, ссылка на код плагина — https://github.com/iontzev/nautobot-porthistory-plugin

© Habrahabr.ru