[Перевод] Картографирование шума с помощью KSQL, Raspberry Pi и радиоприемника

8he_jpuc-mgf_sarxdvkm3n6xsg.png

На первый взгляд, в этой истории есть всё, чтобы заслужить статус романтичного поста накануне 8 марта: самолёты, любовь, чуточку шпионажа и, наконец, котик (точнее, кошка). Трудно представить, что всё это имеет самое непосредственное отношение к Kafka, KSQL и эксперименту «как в домашних условиях с помощью информационных технологий найти самый шумный самолёт». Трудно, но придётся: именно такой эксперимент провёл Саймон Обьюри, а мы перевели статью его авторства с описанием всех подробностей процесса.
Наша новая кошка по имени Снежок просыпается рано. Ее будят звуки самолетов, пролетающих над нашим домом. А что если бы я, используя Apache Kafka, KSQL и Raspberry Pi, смог определить, какой именно самолет не дает моей кошке спать? Хорошо бы еще создать занятную панель слежения, на которую кошка могла бы переключить свое внимание — и дать мне ещё немножко поспать.

В общих чертах


f8dfkgmmkghrnfhtjecrthwx8j8.png
Переносим самолеты с неба в графики с помощью Kafka и KSQL

Самолеты определяют свое местоположение с помощью GPS приемников. Бортовой передатчик периодически сообщает локацию, идентификационный номер, высоту и скорость корабля, используя короткие радиопередачи. Эти передачи вещательного автоматического зависимого наблюдения (АЗН-В) являются по сути пакетами данных, открытыми для доступа с наземных станций.
Один микрокомпьютер, такой как Raspberry Pi, и несколько вспомогательных компонентов — это все, что требуется для получения сообщений бортовых передатчиков самолетов, снующих над моим домом.
Бортовые передачи самолетов выглядят как запутанный клубок сообщений и требуют систематизации. Распознать эти хаотичные потоки данных — это все равно что подслушать беседу на шумной вечеринке. Поэтому, чтобы найти самолет, который тревожит мою кошку, я решил использовать сочетание Kafka и KSQL.

om2co03tws9dxaml4hsmwrgqzsg.png
Разбуженная кошка и Raspberry Pi

Сбор показаний АЗН-В с помощью Raspberry Pi


Для сбора бортовых передач я использовал Raspberry Pi и RTL2832U — USB-модем, изначально продававшийся для просмотра цифрового ТВ на компьютере. На Raspberry Pi я установил dump1090 — программу, которая получает данные с АЗН-В через RTL2832U с помощью небольшой антенны.

_udxrdhg_e8cvqomnkbfkiuvvgm.png
Мой программный радиоприемник из Raspberry Pi и RTL2832U

Преобразуем сигналы АЗН-В в темы Kafka

Теперь, когда я получил поток необработанных сигналов АЗН-В, нам следует обратить внимание на трафик. Raspberry Pi не имеет достаточной мощности для серьезных вычислений, поэтому мне придется передать обработку данных моему локальному кластеру на Kafka.

ar27xbolmidgiscrmspvto0vgnk.png

Получаемые сообщения делятся либо на сообщения о локации, либо на сообщения об идентификации борта. Локация будет иметь выглядеть как борт 7c6db8 летит на высоте 6,250 футов в координате -33.8,151.0. Сообщение об идентификации борта будет выглядеть как борт 7c451c совершает полет по маршруту QJE1726.

Небольшой скрипт на Python, работающий на Raspberry Pi, разделяет входящие сообщения АЗН-В. Я использовал прокси-сервер Confluent Rest Proxy для распределения данных с Raspberry Pi в темы location-topic и ident-topic на Kafka. Прокси-сервер предоставляет RESTful интерфейс для кластера Kafka, позволяя легко создавать сообщения путем простого REST-вызова на Pi.

6dox35w1d-qrzyaiowi80kxtglk.png

Я хотел понять, какие самолеты летали над моей крышей и по каким маршрутам. База данных OpenFlights позволяет сопоставить код авиаборта, например 7C6DB8, присвоенный Международной организацией гражданской авиации (ИКАО), с типом самолета — в нашем случае «Боинг-737». Я загрузил данные моего картографирования в тему icao-to-aircraft.

KSQL предоставляет «SQL-движок», который дает возможность обработки данных в реальном времени по темам Apache Kafka. Например, чтобы найти бортовой код 7C6DB8, мы можем написать запрос следующим образом:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); 

ksql> SELECT manufacturer, aircraft, registration \ 
FROM icao_to_aircraft \ 
WHERE icao = '7C6DB8'; 
Boeing | B738 | VH-VYI

Аналогично, в тему callsign-details я загрузил позывные (т. е. QFA563, это рейс авиакомпании Qantas из Брисбена в Сидней).

CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); 

ksql> SELECT operatorname, fromairport, toairport \ 
FROM callsign_details \ 
WHERE callsign = 'QFA563'; 

Qantas | Brisbane | Sydney

Теперь давайте взглянем поток данных location-topic. Мы можем наблюдать постоянный поток входящих сообщений о местоположении пролетающего самолета.

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic 

{"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}

Запрос на KSQL будет выглядеть так:

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ 
ico, height, location \ 
FROM location_stream \ 
WHERE ico = '7C6DB8'; 

2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: гармонизация потоков…


Настоящая ценность KSQL лежит в возможности объединения входящих потоков данных о местоположении с исходными данными тем (см. 03_ksql.sql) — то есть, добавлении полезных сведений к необработанному потоку данных. Это очень похоже на left join в традиционной базе данных. Результатом является еще одна тема Kafka, произведенная без единой строчки кода Java!

CREATE STREAM location_and_details_stream AS \ 
SELECT l.ico, l.height, l.location, t.aircraft \ 
FROM location_stream l \ 
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao; 

К тому же вы получаете запрос KSQL. Поток данных будет выглядеть так:

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ 
, manufacturer \ 
, aircraft \ 
, registration \ 
, height \ 
, location \ 
FROM location_and_details_stream; 
18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 
18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 
18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

Помимо этого, мы можем объединить входящий поток callsign с фиксированной темой callsign_details:

CREATE STREAM ident_callsign_stream AS \ 
SELECT i.ico \ 
, c.operatorname \ 
, c.callsign \ 
, c.fromairport \ 
, c.toairport \ 
FROM ident_stream i \ 
LEFT JOIN callsign_details c ON i.indentification = c.callsign; 

ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ 
, operatorname \ 
, callsign \ 
, fromairport \ 
, toairport \ 
FROM ident_callsign_stream ; 
18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 
18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 
18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Теперь у нас есть две информативные темы:
1. location_and_details_stream, которая обеспечивает поток обновленной информации о местоположении и скорости самолета;
2. ident_callsign_stream, которая описывает подробности рейса, в том числе авиакомпанию и пункт назначения.

С этими постоянно обновляемыми темами мы можем создать несколько отличных обзорных панелей. Я использовал Kafka Connect, чтобы выгрузить темы Kafka, заполняемые KSQL, в Elasticsearch (полные скрипты здесь).

Обзорная панель Kibana


Вот пример обзорной панели, демонстрирующей местоположение самолета на карте. Кроме того, вы можете увидеть диаграмму по авиакомпаниям, график высоты полета и облака слов по основным пунктам назначения. Тепловая карта показывает районы сосредоточения самолетов, т.е. с наивысшим уровнем шума.

mhfzsyskws6uoe5s1-b6m9sudzc.gif

Назад, к кошке


Сегодня моя кошка разбудила меня в районе 6 часов утра. Может ли KSQL помочь мне найти тот самолет, который пролетал в это время над моим домом на высоте меньше 3,500 футов?

select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') 
, manufacturer 
, aircraft 
, registration 
, height 
from location_and_details_stream 
where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 

2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 
2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Потрясающе! Я могу определить самолет, оказавшийся над моей крышей в 6:15 утра. Оказывается, Снежка разбудил Airbus А380 (огромный самолет, кстати), летевший в Дубай.
Всего пара выходных дней, и у вас есть система потоковой обработки с KSQL. Которая, к тому же, позволяет быстро найти интересные события данных. Хотя Снежок может отнестись к ним скептически.

vsvfypatv-grib-_dstwrco_r2y.png

© Habrahabr.ru