Пишем декодер для sigrok
Если работаешь с цифровой техникой, то рано или поздно появляется необходимость в логическом анализаторе. Одним из доступных радиолюбителям, является логический анализатор DSLogic от DreamSourceLab. Он не раз упоминался на сайте, как минимум: раз, два и три. Его особенностью является открытый исходный код, а также, то что за декодирование сигналов отвечает open-source библиотека sigrok. Вместе с внушительным списком уже существующих декодеров сигнала эта библиотека предоставляет API для написание собственных. Этим мы и займемся.
Демо-стенд
Перед тем как начать писать декодер, нужно получить декодируемый сигнал. Источником сигнала будет микросхема TTP229-BSF. Эта микросхема предназначена для работы с 8-ю или 16-ю сенсорными кнопками и передает данные о нажатиях по двух проводной линии. В качестве приемника выступает плата Arduino, а за сбор данных отвечает логический анализатор DSLogic.
Минимально необходимая реализация декодера
И, хотя официальная документация рекомендует начинать создание собственного декодера с копии уже существующего, который больше всего подходит к декодируемому сигналу. Я хочу начать с создания минимальной версии, которая может быть успешно загружена библиотекой. Это поможет понять, что необходимо реализовывать в декодере, а что опционально.
Каждый декодер в sigrok является отдельным пакетом, написанным на Python 3 и имеет собственную директорию в папке decoders. Как и любой пакет Python, декодер содержит __init__.py, а также по принятому в sigrok именованию, pd.py файл, содержащий непосредственно реализацию.
Код в __init__.py стандартен и включает в себя docstring описывающий протокол, и импорт декодера (d28dee9):
'''
Protocol description.
'''
from .pd import Decoder
Файл pd.py содержит имплементацию декодера (d28dee9):
class Decoder(srd.Decoder):
api_version = 3
id = 'empty'
name = 'empty'
longname = 'empty decoder'
desc = 'Empty decoder that can be loaded by sigrok.'
license = 'mit'
inputs = ['logic']
outputs = ['empty']
tags = ['Embedded/industrial']
channels = (
{'id': 'scl', 'name': 'SCL', 'desc': 'Clock'},
{'id': 'sdo', 'name': 'SDO', 'desc': 'Data'},
)
def start(self):
pass
def reset(self):
pass
def decode(self):
pass
Это минимальная реализация, которая может быть загружена библиотекой, но ничего не декодирует. Давайте разберем обязательные свойства:
- api_version — версия Protocol decoder API, которую используют декодер. В данный момент последняя стабильная версия библиотеки libsigrokdecode поддерживает только 3-ю версию Protocol decoder API. Поэтому только она и будет рассматриваться в дальнейшем.
- id — идентификатор декодера, используется для выбора применяемого декодера. Согласно соглашению об именовании, должен совпадать с именем директории для этого же декодера.
- name, longname, desc — короткое имя, полное имя, описание декодера. Используются для отображения информации о декодере в графическом интерфейсе и интерфейсе командной строки.
- inputs, outputs — формат входных и выходных данных для декодера. Значение 'logic' означает что декодер будет работать напрямую с логическими уровнями сигналов. Другие значения используются в случае объединения нескольких декодеров в стек. Например, если есть свой протокол общения между устройствами, который на нижнем уровне использует SPI. Тогда не имеет смысла снова заниматься декодированием SPI протокола. Можно сразу написать свой декодер, который оперирует данными предоставляемыми существующим SPI декодером и собрать их в стек. В этом случае формат входных данных для нового декодера будет задаваться как 'spi'.
- license, tag — тип лицензии и теги. Для связки DSView 1.1.1 + libsigrokdecode наличие свойства tags по какой-то причине обязательно.
- channels — список сигнальных линий используемых декодером. Это свойство обязательно для декодеров, у которых входной формат данных определен как logic.
и обязательные методы:
- start () — метод вызываемый перед началом декодирования. В этом методе должны производиться настройки для текущей сессии декодирования.
- reset () — метод вызываемый при остановке декодирования. Должен возвращать декодер в начальное состояние.
- decode () — метод вызываемый для декодирования сигнала.
Разобравшись с минимальной реализацией декодера, можно приступить к декодированию реального сигнала.
Полнофункциональный декодер
Для начала рассмотрим временну́ю диаграмму сигнала данных. TTP229-BSF имеет несколько режимов работы, и я привожу временную диаграмму только для того режима, который будет использоваться далее. Более подробную информацию о всех режимах работы микросхемы можно найти в документации к ней.
Первое и самое важное, необходимо описать набор обязательных линий, с которыми будет работать декодер. В данном случае их две, это линия тактирования (SCL) и линия данных (SDO).
class Decoder(srd.Decoder):
...
inputs = ['logic']
channels = (
{'id': 'scl', 'name': 'SCL', 'desc': 'Clock'},
{'id': 'sdo', 'name': 'SDO', 'desc': 'Data'},
)
Когда микросхема определяет нажатие кнопки, она выставляет на линии SDO сигнал Data Valid (DV) по которому приемник должен начать считывание данных. Давайте найдем и декодируем этот сигнал.
sigrok поддерживает несколько форматов для входных данных, но независимо от них плагины всегда оперируют семплами. Семпл содержит информацию о состоянии линий в каждый конкретный момент времени. Частота, с которой логический анализатор сохраняет семплы, называется частотой семплирования. В третьей версии Protocol decoder API был изменен подход к тому как плагины работают с семплами. Теперь нет необходимости писать цикл по всем собранным семплам вручную. Вместо этого была добавлена функция wait (). Она принимает список условий для поиска нужного семпла. При выполнении любого из условий функция возвращает состояние сигналов для найденного семпла, а свойство self.samplenum содержит номер этого семпла.
Каждое условие поиска представляет собой словарь, где ключ является номером линии, а значение — одной из констант, описывающих состояние линии:
- 'l' — низкий уровень, логический 0;
- 'h' — высокий уровень, логическая 1;
- 'r' — нарастание сигнала, переход с низкого состояния в высокое;
- 'f' — спад сигнала, переход с высокого состояния в низкое;
- 'e' — произвольное изменение сигнала, нарастание или спад;
- 's' — стабильное состояние, 0 или 1.
Таким образом, для нахождения начала сигнала DV потребуется условие, описывающее, что линия SCL имеет высокий уровень, а на линии данных (SDO) происходит спад сигнала. Вызовем с составленным условием функцию wait () и сохраним номер семпла:
self.wait({0: 'h', 1: 'f'})
self.dv_block_ss = self.samplenum
для нахождения окончания сигнала DV необходимо задать условие, когда линия SCL остается в высоком состоянии, а линия данных переходит в высокое состояние:
self.wait({0: 'h', 1: 'r'})
По завершению последнего вызова функции wait () будут известны номера семплов начала и конца сигнала DV. Самое время создать для него аннотацию. Для этого добавим в декодер описание аннотаций (annotations) и их объединение в группы (annotation_rows):
class Decoder(srd.Decoder):
...
annotations = (
('dv', 'Data valid'),
)
annotation_rows = (
('fields', 'Fields', (0,)),
)
где 0, это индекс аннотации в self.annotations кортеже, входящей в эту группу. Также потребуется зарегистрировать вывод аннотаций:
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
Теперь все готово к тому, чтобы разместить аннотацию к сигналу DV. Делается это с помощью вызова функции put () (f613b83):
self.put(self.dv_block_ss, self.samplenum,
self.out_ann, [0, ['Data valid', 'DV']])
Параметры функции: номер семпла начала аннотации (self.dv_block_ss), номер семпла окончания аннотации (self.samplenum), идентификатор вывода аннотации (self.out_ann) и данные для аннотации. Данные представляются в виде списка из индекса аннотации (0) и вложенного списка строк, от самой длинной к самой короткой, для отображения в описании. Если указано больше одной строки интерфейс может самостоятельно выбрать отображаемую строку, например, в зависимости от используемого масштаба:
Аналогичным образом добавим аннотацию для задержки Tw между окончанием сигнала DV и началом чтения данных микроконтроллером. Далее можно приступать к декодированию данных о нажатии кнопок.
TTP229-BSF, в зависимости от выбранного режима, может работать с 8-ю или 16-ю сенсорными кнопками. При этом передаваемые данные не содержат информации о режиме работы микросхемы. Поэтому для декодера стоит добавить опцию, задающую режим, в котором работает микросхема.
class Decoder(srd.Decoder):
...
options = (
{'id': 'key_num', 'desc': 'Key number', 'default': 8,
'values': (8, 16)},
)
def start(self):
...
self.key_num = self.options['key_num']
Данная опция будет доступна для задания значения в пользовательском интерфейсе при выборе декодера.
Как видно из временной диаграммы, данные на линии SDO выставляются при переходе SCL в активный (низкий) уровень и сохраняются при возвращении сигнала в пассивный уровень. В этот момент и микроконтроллер, и декодер могут фиксировать выставленные на линии SDL данные. Переход SCL снова в активный уровень, можно рассматривать как начало следующей посылки данных. В этом случае функция декодирования будет иметь вид (ca9a370):
def decode(self):
self.wait({0: 'h', 1: 'f'})
self.dv_block_ss = self.samplenum
self.wait({0: 'h', 1: 'r'})
self.put(self.dv_block_ss, self.samplenum,
self.out_ann, [0, ['Data valid', 'DV']])
self.tw_block_ss = self.samplenum
self.wait([{0: 'f', 1: 'h'}, {0: 'f', 1: 'f'}])
self.put(self.tw_block_ss, self.samplenum,
self.out_ann, [1, ['Tw', 'Tw']])
self.bt_block_ss = self.samplenum
for i in range(self.key_num):
(scl, sdo) = self.wait({0: 'r'})
sdo = 0 if sdo else 1
self.wait({0: 'f'})
self.put(self.bt_block_ss, self.samplenum,
self.out_ann, [2, ['Bit: %d' % sdo, '%d' % sdo]])
self.bt_block_ss = self.samplenum
Но у такого подхода размещения аннотаций есть недостаток, аннотация для последнего бита будет продолжаться до следующего чтения микроконтроллером данных.
Это неудобно и может быть решено несколькими способами. Рассмотрим один из них. Согласно описанию для микросхемы и её временной диаграмме, при отсутствии на линии SCL сигнала в течении двух милисекунд микросхема переходит в начальное состояние. Таким образом, аннотация для каждой посылки данных должна завершаться по переходу SCL в активное состояние или же по истечению 2 мс., смотря что наступит раньше. Для пропуска семплов в условии необходимо использовать специальное ключевое значение 'skip', а число семплов, которые необходимо пропустить, указывается как его значение. Необходимое число семплов легко вычислить, если известна частота семплирования. Получить её можно реализовав функцию metadata (). Значение частоты семплирования передается в Hz.
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.timeout_samples_num = int(2 * (value / 1000.0))
Тогда условие в функции декодирования запишется с использованием skip в следующей форме, плюс дополнительная проверка, что в процессе чтения данных о нажатой кнопке микросхема не вернулась в начальное состояние (6a0422d).
def decode(self):
...
for i in range(self.key_num):
...
self.wait([{0: 'f'}, {'skip': self.timeout_samples_num}])
self.put(self.bt_block_ss, self.samplenum,
self.out_ann, [2, ['Bit: %d' % sdo, '%d' % sdo]])
if (self.matched & 0b10) and i != (self.key_num - 1):
break
Сейчас декодер может обрабатывать полную посылку данных. И будет удобно, если в дополнении к информации об отдельных битах добавится аннотация о том, какая же кнопка была нажата. Для этого добавим описание еще одной аннотации. Поскольку аннотация для нажатия кнопки относится ко всей посылке данных и пересекается с добавленными ранее аннотациями, то её следует поместить в отдельную группу. Создадим для неё новую группу аннотаций 'Key message'. (91c64e6).
class Decoder(srd.Decoder):
...
annotations = (
('dv', 'Data valid'),
('tw', 'Tw'),
('bit', 'Bit'),
('key', 'Key press status'),
)
annotation_rows = (
('fields', 'Fields', (0, 1, 2)),
('keymsg', 'Key message', (3,)),
)
def decode(self):
...
keys_pressed = list()
for i in range(self.key_num):
...
else:
key_msg = \
'Key: %s' % (','.join(keys_pressed)) if keys_pressed else 'Key unpressed'
key_msg_short = \
'K: %s' % (','.join(keys_pressed)) if keys_pressed else 'KU'
self.put(self.dv_block_ss, self.samplenum,
self.out_ann, [3, [key_msg, key_msg_short]])
До этого момента весь код работал только с первой посылкой. Вы уже обратили внимание на 19% рядом с именем декодера? Это процент семплов, которые были обработаны до выхода из функции decode (). Для обработки всех семплов остается добавить бесконечный цикл вокруг кода по декодированию отдельной посылки данных (48f95fb).
def decode(self):
...
while True:
self.wait({Pin.SCL: self.passive_signal, Pin.SDO: self.front_edge})
self.dv_block_ss = self.samplenum
...
Поскольку декодирование автоматически завершится, если при поиске следующего семпла функция wait () переберет их все. В результате этого изменения будут обрабатывать все семплы и все посылки данных как изображенно на КДПВ.
Заключительным штрихом остается добавить возможность выбора уровня активного сигнала и полноценный декодер для TTP229-BSF будет готов. Исходный код финальной версии также доступен на GitHub.