Сим-сим откройся: как я научил дверь своего подъезда узнавать меня в лицо
Пятничный рабочий день на удалёнке уже подходил к концу, как в дверь постучали, чтобы сообщить об установке нового домофона. Узнав, что новый домофон имеет мобильное приложение, позволяющее отвечать на звонки не находясь дома, я заинтересовался и сразу же загрузил его на свой телефон. Залогинившись, я обнаружил интересную особенность этого приложения — даже без активного вызова в мою квартиру я мог смотреть в камеру домофона и открывать дверь в произвольный момент времени. «Да это же онлайн АРI к двери подъезда!» — щёлкнуло в голове. Судьба предстоящих выходных была предрешена.
Видеодемонстрация в конце статьи.
Кадр из фильма «Пятый Элемент»Дисклеймер
Используемая технология аутентификации на основании распознавания лиц не учитывает множества сценариев атак и недостаточна для обеспечения безопасности. Описанная система анализа кадров была активна только в моменты, когда кому-то из моей семьи было необходимо попасть в подъезд — она не совершала автоматическую обработку лиц других людей и не могла привести к допуску в подъезд третьих лиц.
Получение API
Чтобы автоматизировать управление дверью, необходимо понять, куда и в каком формате отправляет запросы само приложение. Реверс-инжиниринг — дело неоднозначное, поэтому я попытался обойтись без него и вместо этого просто перехватить свой собственный трафик. Для этой задачи я взял НТТР Тооlkit — комплекс программ, который позволяет наладить прослушивание http (s) запросов собственного Android устройства.
Первая попытка оказывается провальной — после установки на телефон Android-части инструментария и сгенерированного Certificate authority оказалось, что мобильное приложение домофона не доверяет пользовательским СА. Согласно документации, начиная с Android 7 манифест приложения должен явно изъявлять такое желание.
Так как мой телефон не поддерживает root доступ для модификации списка системных СА, я воспользовался официальным эмулятором Android, идущим в комплекте с Android Studio. После запуска эмулятора и перехвата с помощью ADB меня встретило радостное сообщение о том, что трафик от всех приложений без Certificate pinning будет успешно расшифрован.
Успешное соединение с HTTP ToolkitК счастью, приложение оказалось как раз из таких — немного побродив по приложению и открыв дверь, можно переходить к анализу логов.
Запрос открытия двериВсего интересными показалось три запроса:
Запрос на открытие двери подъезда: POST по адресу
/rest/v1/places/{place_id}/accesscontrols/{control_id}/actions
с JSON-телом{"name": "accessControlOpen"}
Получение снимка (превью) с камеры: GET по адресу
/rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots
Получение ссылки на видеопоток с аудио: GET по адресу
/rest/v1/forpost/cameras/{camera_id}/video?LightStream=0
HTTP заголовки всех трёх запросов содержат ключ Authorization — судя по всему, именно по нему происходит авторизация при выполнении запросов. Сделав пару запросов руками через Advanced REST Client, чтобы убедиться, что заголовка Authorization достаточно и в самостоятельной работе с API не осталось подводных камней, я понял, что можно откладывать эмулятор и переходить к написанию кода.
Вооружившись Python и requests
, я написал необходимые для следующих этапов функции исполнения каждого из трёх действий:
HEADERS = {"Authorization": "Bearer ###"}
ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/"
VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0"
def get_image():
result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS)
if result.status_code != 200:
logging.error(f"Failed to get an image with status code {result.status_code}")
return None
logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec")
return result.content
def open_door():
result = requests.post(
f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"})
if result.status_code != 200:
logging.error(f"Failed to open the door with status code {result.status_code}")
return False
logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec")
return True
def get_videostream_link():
result = requests.get(VIDEO_URL, headers=HEADERS)
if result.status_code != 200:
logging.error(f"Failed to get stream link with status code {result.status_code}")
return False
logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec")
return result.json()['data']['URL']
Поиск знакомых лиц в кадре
Прежде чем начать этот раздел, нужно рассказать пару слов об уже имеющихся на тот момент у меня в распоряжении серверных мощностях — это недорогая виртуальная машина с доступом ко всего одному потоку Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz
, 1GB оперативной памяти и 0 GPU. Тратиться на более дорогую конфигурацию не хотелось, а значит, нужно было попробовать выжать максимум из имеющихся ресурсов.
На данном этапе проектом заинтересовалась моя жена, хорошо разбирающаяся в машинном обучении и в итоге взявшая на себя эту задачу. Для этой работы был выбран OpenVINO Toolkit — инструментарий от Intel, в том числе заточенный как раз на запуск моделей машинного обучения на CPU.
Непродолжительный поиск существующих решений привёл на страницу Interactive Face Recognition Demo — официального демо, показывающего ровно необходимый функционал сравнения видимых в кадре лиц с базой заранее сохранённых. Единственная проблема состояла в том, что данный пример по каким-то причинам исчез после релиза 2020.3, а удобная установка пакета через pip у проекта появилась только с 2021.1. Было решено установить последнюю версию OpenVINO и адаптировать код под неё.
К счастью, гит помнит всё и получить из репозитория проекта код демо и нужные модели не составило труда. После удаления всей работы с командной строкой (взяв для всего значения по умолчанию), визуализацией и видеопотоком вебкамеры, был получен класс, способный распознавать лица в конкретном кадре:
class ImageProcessor:
def __init__(self):
self.frame_processor = FrameProcessor()
def process(self, image):
detections = self.frame_processor.process(image)
labels = []
for roi, landmarks, identity in zip(*detections):
label = self.frame_processor.face_identifier.get_identity_label(
identity.id)
labels.append(label)
return labels
После создания базы лиц из десятка селфи можно было переходить к тестированию фактической производительности полученного решения на имеющемся железе. В качестве подопытных картинок я взял фотографию себя и фото пустой улицы, сделанные на домофонную камеру функцией get_image()
выше.
100 runs on an image with known face:
Total time: 7.356s
Time per frame: 0.007s
FPS: 135.944
100 runs on an image without faces:
Total time: 2.985s
Time per frame: 0.003s
FPS: 334.962
Очевидно, что показанная производительность с запасом покрывает нужды детектирования лиц в реальном времени.
1 FPS: Работа со снимками с камеры
Итак, на этом этапе у меня уже был класс для поиска заданных лиц в кадре, а также функции получения снимка с камеры и ссылки на видеопоток. С видео на тот момент я никогда не работал, потому для MVP решил переложить работу по выделению кадров на сервер и снова воспользоваться get_image()
.
class ImageProcessor:
# <...>
def process_single_image(self, image):
nparr = np.fromstring(image, np.uint8)
img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
labels = self.process(img_np)
return labels
def snapshot_based_intercom_id():
processor = ImageProcessor()
last_open_door_time = time.time()
while True:
start_time = time.time()
image = get_image()
result = processor.process_single_image(image)
logging.info(f'{result} in {time.time() - start_time}s')
# Successfull detections are "face{N}"
if any(['face' in res for res in result]):
if start_time - last_open_door_time > 5:
open_door()
with open(f'images/{start_time}_OK.jfif', 'wb') as f:
f.write(image)
last_open_door_time = start_time
Цикл получает картинки от домофона, ищет на них знакомые лица, открывает дверь в случае успешной детекции и сохраняет фото на память. Важно было не забыть ограничить частоту отправки команды открытия двери, т.к. лицо обычно распознаётся ещё на подходах к двери.
Момент успешного распознавания, версия с обработкой отдельных снимковЗаработало! Полностью довольный первым запуском, я вернулся в квартиру. Единственное, что портило впечатление от системы распознавания — время реакции на появление лица в кадре, т.к. время отклика API оставляло желать лучшего. Низкая частота поступления данных, 0.7с на получение картинки и 0.6с на открытие двери, давали видимый невооружённым взглядом лаг.
До 30 FPS: Обработка видеопотока
Получить кадры из видео оказалось на удивление просто:
vcap = cv2.VideoCapture(link)
success, frame = vcap.read()
Замеры показали, что камера домофона способна выдавать стабильные 30 FPS. Проблемой оказалось поддержание меньшей частоты кадров: метод read()
возвращает последний необработанный кадр из внутренней очереди кадров. Если эта очередь наполняется видеопотоком быстрее, чем вычитывается, то обрабатываемые кадры начинают отставать от реального времени, что приводит к нежелательной задержке. Дополнительно, с практической точки зрения, распознавать лица 30 раз в секунду — пустая трата вычислительных ресурсов, так как обычно люди подходят к двери подъезда с небольшой скоростью.
Первым потенциальным решением было установить размер внутренней очереди: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);
. Согласно найденной информации, такой трюк должен был хорошо работать с любой конфигурацией системы для версий OpenCV выше 3.4, но по какой-то причине, так и не оказал никакого влияния в моём случае. Единственной рабочей альтернативой стал подход, описанный в этом ответе со StackOverflow — завести отдельный поток, читающий кадры из камеры на максимально возможной скорости и сохраняющий последний в поле класса для дальнейшего доступа (впоследствии оказалось, что именно этот цикл ответственен за большую часть потребления процессора).
Получилась модификация ImageProcessor
для обработки видеопотока с частотой 3 кадра в секунду:
class CameraBufferCleanerThread(threading.Thread):
def __init__(self, camera, name='camera-buffer-cleaner-thread'):
self.camera = camera
self.last_frame = None
self.finished = False
super(CameraBufferCleanerThread, self).__init__(name=name)
self.start()
def run(self):
while not self.finished:
ret, self.last_frame = self.camera.read()
def __enter__(self): return self
def __exit__(self, type, value, traceback):
self.finished = True
self.join()
class ImageProcessor:
# <...>
def process_stream(self, link):
vcap = cv2.VideoCapture(link)
interval = 0.3 # ~3 FPS
with CameraBufferCleanerThread(vcap) as cam_cleaner:
while True:
frame = cam_cleaner.last_frame
if frame is not None:
yield (self.process(frame), frame)
else:
yield (None, None)
time.sleep(interval)
И соответствующая модификация snapshot_based_intercom_id
:
def stream_based_intercom_id():
processor = ImageProcessor()
link = get_videostream_link()
# To notify about delays
last_time = time.time()
last_open_door_time = time.time()
for result, np_image in processor.process_stream(link):
current_time = time.time()
delta_time = current_time - last_time
if delta_time < 1:
logging.info(f'{result} in {delta_time}')
else:
logging.warning(f'{result} in {delta_time}')
last_time = current_time
if result is None:
continue
if any(['face' in res for res in result]):
if current_time - last_open_door_time > 5:
logging.warning(
f'Hey, I know you - {result[0]}! Opening the door...')
last_open_door_time = current_time
open_door()
cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
Тест в реальных условиях показал ощутимое падение времени отклика — при хорошем освещении подъездная дверь оказывалась открытой ещё до того, как я успевал подойти к ней вплотную.
Момент успешного распознавания, версия с обработкой видеопотокаУправление с помощью Telegram бота
Сама система доказала свою работоспособность и теперь хотелось создать для неё удобный интерфейс для включения/выключения. Телеграм бот показался отличным вариантом решения для этой задачи.
С помощью пакета python-telegram-bot
была написана простая оболочка, принимающая в себя callback включения/выключения системы и список доверенных никнеймов.
class TelegramInterface:
def __init__(self, login_whitelist, state_callback):
self.state_callback = state_callback
self.login_whitelist = login_whitelist
self.updater = Updater(
token = "###", use_context = True)
self.run()
def run(self):
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler("start", self.start))
dispatcher.add_handler(CommandHandler("run", self.run_intercom))
dispatcher.add_handler(CommandHandler("stop", self.stop_intercom))
self.updater.start_polling()
def run_intercom(self, update: Update, context: CallbackContext):
user = update.message.from_user
update.message.reply_text(
self.state_callback(True) if user.username in self.login_whitelist else 'not allowed',
reply_to_message_id=update.message.message_id)
def stop_intercom(self, update: Update, context: CallbackContext):
user = update.message.from_user
update.message.reply_text(
self.state_callback(False) if user.username in self.login_whitelist else 'not allowed',
reply_to_message_id=update.message.message_id)
def start(self, update: Update, context: CallbackContext) -> None:
update.message.reply_text('Hi!')
class TelegramBotThreadWrapper(threading.Thread):
def __init__(self, state_callback, name='telegram-bot-wrapper'):
self.whitelist = ["###", "###"]
self.state_callback = state_callback
super(TelegramBotThreadWrapper, self).__init__(name=name)
self.start()
def run(self):
self.bot = TelegramInterface(self.whitelist, self.state_callback)
И следующая ступень эволюции функции intercom_id
, обрабатывающая синхронизацию между потоками обработки данных и получения команд от бота:
def stream_based_intercom_id_with_telegram():
processor = ImageProcessor()
loop_state_lock = threading.Lock()
loop_should_run = False
loop_should_change_state_cv = threading.Condition(loop_state_lock)
is_loop_finished = True
loop_changed_state_cv = threading.Condition(loop_state_lock)
def stream_processing_loop():
nonlocal loop_should_run
nonlocal loop_should_change_state_cv
nonlocal is_loop_finished
nonlocal loop_changed_state_cv
while True:
with loop_should_change_state_cv:
loop_should_change_state_cv.wait_for(lambda: loop_should_run)
is_loop_finished = False
loop_changed_state_cv.notify_all()
logging.warning(f'Loop is started')
link = get_videostream_link()
last_time = time.time()
last_open_door_time = time.time()
for result, np_image in processor.process_stream(link):
with loop_should_change_state_cv:
if not loop_should_run:
is_loop_finished = True
loop_changed_state_cv.notify_all()
logging.warning(f'Loop is stopped')
break
current_time = time.time()
delta_time = current_time - last_time
if delta_time < 1:
logging.info(f'{result} in {delta_time}')
else:
logging.warning(f'{result} in {delta_time}')
last_time = current_time
if result is None:
continue
if any(['face' in res for res in result]):
if current_time - last_open_door_time > 5:
logging.warning(f'Hey, I know you - {result[0]}! Opening the door...')
last_open_door_time = current_time
open_door()
cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
def state_callback(is_running):
nonlocal loop_should_run
nonlocal loop_should_change_state_cv
nonlocal is_loop_finished
nonlocal loop_changed_state_cv
with loop_should_change_state_cv:
if is_running == loop_should_run:
return "Intercom service state is not changed"
loop_should_run = is_running
if loop_should_run:
loop_should_change_state_cv.notify_all()
loop_changed_state_cv.wait_for(lambda: not is_loop_finished)
return "Intercom service is up"
else:
loop_changed_state_cv.wait_for(lambda: is_loop_finished)
return "Intercom service is down"
telegram_bot = TelegramBotThreadWrapper(state_callback)
logging.warning("Bot is ready")
stream_processing_loop()
Результат
Видео:
Послесловие
Несмотря на возможности, которые технология умных домофонов несёт жильцам, объединённые в единую сеть сотни (тысячи?) подъездных дверей с камерами и микрофонами (да, в произвольно получаемом видеопотоке есть и аудио!), ведущими непрерывную запись — как по мне, скорее пугающее явление, открывающее новые возможности для нарушения приватности.
Я бы предпочёл, чтобы доступ к видеопотоку предоставлялся только в момент звонка в квартиру и ведущаяся трёхдневная запись, позиционирующаяся как средство раскрытия правонарушений, хранилась не на серверах компании, а непосредственно в домофоне, с возможностью доступа к ней по запросу. Или не велась вовсе.