GOSTIM: P2P F2F E2EE IM за один вечер с ГОСТ-криптографией
Будучи разработчиком PyGOST библиотеки (ГОСТовые криптографические примитивы на чистом Python), я нередко получаю вопросы о том как на коленке реализовать простейший безопасный обмен сообщениями. Многие считают прикладную криптографию достаточно простой штукой, и .encrypt () вызова у блочного шифра будет достаточно для безопасной отсылки по каналу связи. Другие же считают, что прикладная криптография — удел немногих, и приемлемо, что богатые компании типа Telegram с олимпиадниками-математиками не могут реализовать безопасный протокол.
Всё это побудило меня написать данную статью, чтобы показать, что реализация криптографических протоколов и безопасного IM-а не такая сложная задача. Однако, изобретать собственные протоколы аутентификации и согласования ключей не стоит.
В статье будет написан peer-to-peer, friend-to-friend, end-to-end зашифрованный instant messenger с SIGMA-I протоколом аутентификации и согласования ключей (на базе которого реализован IPsec IKE), используя исключительно ГОСТовые криптографические алгоритмы PyGOST библиотеки и ASN.1 кодирование сообщений библиотекой PyDERASN (про которую я уже писал раньше). Необходимое условие: он должен быть настолько прост, чтобы его можно было написать с нуля за один вечер (или рабочий день), иначе это уже не простая программа. В ней наверняка есть ошибки, излишние сложности, недочёты, плюс это моя первая программа с использованием asyncio библиотеки.
Дизайн IM
Для начала, надо понять как будет выглядеть наш IM. Для простоты, пускай это будет peer-to-peer сеть, без какого-либо обнаружения участников. Собственноручно будем указывать к какому адресу: порту подключаться для общения с собеседником.
Я понимаю, что, на данный момент, предположение о доступности прямой связи между двумя произвольными компьютерами — существенное ограничение применимости IM на практике. Но чем больше разработчиков будут реализовывать всякие NAT-traversal костыли, тем дольше мы так и будем оставаться в IPv4 Интернете, с удручающей вероятностью связи между произвольными компьютерами. Ну сколько можно терпеть отсутствие IPv6 дома и на работе?
У нас будет friend-to-friend сеть: все возможные собеседники заранее должны быть известны. Во-первых, это сильно всё упрощает: представились, нашли или не нашли имя/ключ, отключились или продолжаем работу, зная собеседника. Во-вторых, в общем случае, это безопасно и исключает множество атак.
Интерфейс IM-а будет близок к классическим решениям suckless-проектов, которые мне очень нравятся своим минимализмом и Unix-way философией. IM программа для каждого собеседника создаёт директорию с тремя Unix domain socket:
- in — в него записываются отправляемые собеседнику сообщения;
- out — из него читаются принимаемые от собеседника сообщения;
- state — читая из него, мы узнаём подключён ли сейчас собеседник, адрес/порт подключения.
Кроме того, создаётся conn сокет, записав в который хост порт, мы инициируем подключение к удалённому собеседнику.
|-- alice
| |-- in
| |-- out
| `-- state
|-- bob
| |-- in
| |-- out
| `-- state
`- conn
Такой подход позволяет делать независимые реализации IM транспорта и пользовательского интерфейса, ведь на вкус и цвет товарища нет, каждому не угодишь. Используя tmux и/или multitail, можно получить многооконный интерфейс с синтаксической подсветкой. А с помощью rlwrap можно получить GNU Readline-совместимую строку для ввода сообщений.
На самом деле, suckless проекты используют FIFO-файлы. Лично я не смог понять как в asyncio работать с файлами конкурентно без собственноручной подложки из выделенных тредов (для таких вещей давно использую язык Go). Поэтому решил обойтись Unix domain сокетами. К сожалению, это лишает возможности сделать echo 2001:470: dead: babe 6666 > conn. Я решил эту проблему, используя socat: echo 2001:470: dead: babe 6666 | socat — UNIX-CONNECT: conn, socat READLINE UNIX-CONNECT: alice/in.
Первоначальный небезопасный протокол
В качестве транспорта используется TCP: он гарантирует доставку и её порядок. UDP не гарантирует ни того, ни другого (что было бы полезным, когда применится криптография), а поддержки SCTP в Python из коробки нет.
К сожалению, в TCP нет понятия сообщения, а только потока байт. Поэтому необходимо придумать формат для сообщений, чтобы их можно было разделять между собой в этом потоке. Можем условиться использовать символ перевода строки. Для начала подойдёт, однако, когда мы начнём шифровать наши сообщения, этот символ может появиться где угодно в шифротексте. В сетях поэтому популярны протоколы отправляющие сначала длину сообщения в байтах. Например, в Python из коробки есть xdrlib позволяющая работать с подобным форматом XDR.
Мы не будем правильно и эффективно работать с TCP чтением — упростим код. Читаем в бесконечном цикле данные из сокета, пока не декодируем полное сообщение. В качестве формата для такого подхода можно использовать и JSON с XML. Но, когда добавится криптография, то данные придётся подписывать и аутентифицировать —, а это потребует байт-в-байт идентичного представления объектов, чего не обеспечивают JSON/XML (dumps результат может отличаться).
XDR подходит для такой задачи, однако я выбираю ASN.1 с DER-кодированием и PyDERASN библиотеку, так как на руках у нас будут высокоуровневые объекты с которыми часто приятнее и удобнее работать. В отличии от schemaless bencode, MessagePack или CBOR, ASN.1 автоматически проверит данные напротив жёстко заданной схемы.
# Msg ::= CHOICE {
# text MsgText,
# handshake [0] EXPLICIT MsgHandshake }
class Msg(Choice):
schema = ((
("text", MsgText()),
("handshake", MsgHandshake(expl=tag_ctxc(0))),
))
# MsgText ::= SEQUENCE {
# text UTF8String (SIZE(1..MaxTextLen))}
class MsgText(Sequence):
schema = ((
("text", UTF8String(bounds=(1, MaxTextLen))),
))
# MsgHandshake ::= SEQUENCE {
# peerName UTF8String (SIZE(1..256)) }
class MsgHandshake(Sequence):
schema = ((
("peerName", UTF8String(bounds=(1, 256))),
))
Принимаемым сообщением будет Msg: либо текстовое MsgText (пока с одним текстовым полем), либо сообщение рукопожатия MsgHandshake (в котором передаётся имя собеседника). Сейчас выглядит переусложнённым, но это задел на будущее.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │MsgHandshake(IdA) │ │─────────────────>│ │ │ │MsgHandshake(IdB) │ │<─────────────────│ │ │ │ MsgText() │ │─────────────────>│ │ │ │ MsgText() │ │<─────────────────│ │ │
IM без криптографии
Как я уже говорил, для всех операций с сокетами будет использоваться asyncio библиотека. Объявим что мы ожидаем в момент запуска:
parser = argparse.ArgumentParser(description="GOSTIM")
parser.add_argument(
"--our-name",
required=True,
help="Our peer name",
)
parser.add_argument(
"--their-names",
required=True,
help="Their peer names, comma-separated",
)
parser.add_argument(
"--bind",
default="::1",
help="Address to listen on",
)
parser.add_argument(
"--port",
type=int,
default=6666,
help="Port to listen on",
)
args = parser.parse_args()
OUR_NAME = UTF8String(args.our_name)
THEIR_NAMES = set(args.their_names.split(","))
Задаётся собственное имя (--our-name alice). Через запятую перечисляются все ожидаемые собеседники (--their-names bob, eve). Для каждого из собеседников, создаётся директория с Unix сокетами, а также по корутине на каждый in, out, state:
for peer_name in THEIR_NAMES:
makedirs(peer_name, mode=0o700, exist_ok=True)
out_queue = asyncio.Queue()
OUT_QUEUES[peer_name] = out_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_out_processor, out_queue=out_queue),
path.join(peer_name, "out"),
))
in_queue = asyncio.Queue()
IN_QUEUES[peer_name] = in_queue
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_in_processor, in_queue=in_queue),
path.join(peer_name, "in"),
))
asyncio.ensure_future(asyncio.start_unix_server(
partial(unixsock_state_processor, peer_name=peer_name),
path.join(peer_name, "state"),
))
asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))
Приходящие от пользователя сообщения из in сокета отправляются в IN_QUEUES очереди:
async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None:
while True:
text = await reader.read(MaxTextLen)
if text == b"":
break
await in_queue.put(text.decode("utf-8"))
Приходящие от собеседников сообщения отправляются в OUT_QUEUES очереди, из которых данные записываются в out сокет:
async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None:
while True:
text = await out_queue.get()
writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8"))
await writer.drain()
При чтении из state сокета, программа ищет в PEER_ALIVE словаре адрес собеседника. Если подключения к собеседнику ещё нет, то записывается пустая строка.
async def unixsock_state_processor(reader, writer, peer_name: str) -> None:
peer_writer = PEER_ALIVES.get(peer_name)
writer.write(
b"" if peer_writer is None else (" ".join([
str(i) for i in peer_writer.get_extra_info("peername")[:2]
]).encode("utf-8") + b"\n")
)
await writer.drain()
writer.close()
При записи адреса в conn сокет, запускается функция «инициатора» соединения:
async def unixsock_conn_processor(reader, writer) -> None:
data = await reader.read(256)
writer.close()
host, port = data.decode("utf-8").split(" ")
await initiator(host=host, port=int(port))
Рассмотрим инициатора. Сначала он, очевидно, открывает соединение до указанного хоста/порта и отправляет handshake сообщение со своим именем:
130 async def initiator(host, port):
131 _id = repr((host, port))
132 logging.info("%s: dialing", _id)
133 reader, writer = await asyncio.open_connection(host, port)
134 # Handshake message {{{
135 writer.write(Msg(("handshake", MsgHandshake((
136 ("peerName", OUR_NAME),
137 )))).encode())
138 # }}}
139 await writer.drain()
Затем, ждёт ответа от удалённой стороны. Пытается декодировать пришедший ответ по Msg ASN.1 схеме. Предполагаем что всё сообщение будет отправлено одним TCP-сегментом и мы атомарно его получим при вызове .read (). Проверяем что мы получили именно handshake сообщение.
141 # Wait for Handshake message {{{
142 data = await reader.read(256)
143 if data == b"":
144 logging.warning("%s: no answer, disconnecting", _id)
145 writer.close()
146 return
147 try:
148 msg, _ = Msg().decode(data)
149 except ASN1Error:
150 logging.warning("%s: undecodable answer, disconnecting", _id)
151 writer.close()
152 return
153 logging.info("%s: got %s message", _id, msg.choice)
154 if msg.choice != "handshake":
155 logging.warning("%s: unexpected message, disconnecting", _id)
156 writer.close()
157 return
158 # }}}
Проверяем что пришедшее имя собеседника нам известно. Если нет, то рвём соединение. Проверяем не было ли у нас уже установлено с ним соединение (собеседник вновь дал команду на подключение к нам) и закрываем его. В IN_QUEUES очередь помещаются Python-строки с текстом сообщения, но имеется особое значение None, сигнализирующее msg_sender корутину прекратить работу, чтобы она забыла о своём writer, связанным с устаревшим TCP-соединением.
159 msg_handshake = msg.value
160 peer_name = str(msg_handshake["peerName"])
161 if peer_name not in THEIR_NAMES:
162 logging.warning("unknown peer name: %s", peer_name)
163 writer.close()
164 return
165 logging.info("%s: session established: %s", _id, peer_name)
166 # Run text message sender, initialize transport decoder {{{
167 peer_alive = PEER_ALIVES.pop(peer_name, None)
168 if peer_alive is not None:
169 peer_alive.close()
170 await IN_QUEUES[peer_name].put(None)
171 PEER_ALIVES[peer_name] = writer
172 asyncio.ensure_future(msg_sender(peer_name, writer))
173 # }}}
msg_sender принимает исходящие сообщения (подкладываемые в очередь из in сокета), сериализует их в MsgText сообщение и отправляет по TCP-соединению. Оно может оборваться в любой момент — это мы явно перехватываем.
async def msg_sender(peer_name: str, writer) -> None:
in_queue = IN_QUEUES[peer_name]
while True:
text = await in_queue.get()
if text is None:
break
writer.write(Msg(("text", MsgText((
("text", UTF8String(text)),
)))).encode())
try:
await writer.drain()
except ConnectionResetError:
del PEER_ALIVES[peer_name]
return
logging.info("%s: sent %d characters message", peer_name, len(text))
В конце инициатор входит в бесконечный цикл чтения сообщений из сокета. Проверяет текстовые ли это сообщения и помещает в OUT_QUEUES очередь, из которой они будут отправлены в out сокет соответствующего собеседника. Почему нельзя просто делать .read () и декодировать сообщение? Потому что не исключена ситуация, когда несколько сообщений от пользователя будут агрегированы в буфере операционной системы и отправлены одним TCP-сегментом. Декодировать то мы сможем первое, а дальше в буфере может остаться часть от последующего. При любой нештатной ситуации мы закрываем TCP-соединение и останавливаем msg_sender корутину (посылкой None в OUT_QUEUES очередь).
174 buf = b""
175 # Wait for test messages {{{
176 while True:
177 data = await reader.read(MaxMsgLen)
178 if data == b"":
179 break
180 buf += data
181 if len(buf) > MaxMsgLen:
182 logging.warning("%s: max buffer size exceeded", _id)
183 break
184 try:
185 msg, tail = Msg().decode(buf)
186 except ASN1Error:
187 continue
188 buf = tail
189 if msg.choice != "text":
190 logging.warning("%s: unexpected %s message", _id, msg.choice)
191 break
192 try:
193 await msg_receiver(msg.value, peer_name)
194 except ValueError as err:
195 logging.warning("%s: %s", err)
196 break
197 # }}}
198 logging.info("%s: disconnecting: %s", _id, peer_name)
199 IN_QUEUES[peer_name].put(None)
200 writer.close()
66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None:
67 text = str(msg_text["text"])
68 logging.info("%s: received %d characters message", peer_name, len(text))
69 await OUT_QUEUES[peer_name].put(text)
Вернёмся к основному коду. После создания всех корутин в момент запуска программы, мы стартуем TCP-сервер. На каждое установленное соединение он создаёт responder (ответчик) корутину.
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s",
)
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port))
logging.info("Listening on: %s", server.sockets[0].getsockname())
loop.run_forever()
responder схож с initiator и зеркально выполняет все те же самые действия, но бесконечный цикл чтения сообщений запускается сразу же, для простоты. Сейчас протокол рукопожатия отсылает по одному сообщению с каждой стороны, но, в дальнейшем, будет по два от инициатора соединения, после которых сразу же возможна отправка текстовых.
72 async def responder(reader, writer):
73 _id = writer.get_extra_info("peername")
74 logging.info("%s: connected", _id)
75 buf = b""
76 msg_expected = "handshake"
77 peer_name = None
78 while True:
79 # Read until we get Msg message {{{
80 data = await reader.read(MaxMsgLen)
81 if data == b"":
82 logging.info("%s: closed connection", _id)
83 break
84 buf += data
85 if len(buf) > MaxMsgLen:
86 logging.warning("%s: max buffer size exceeded", _id)
87 break
88 try:
89 msg, tail = Msg().decode(buf)
90 except ASN1Error:
91 continue
92 buf = tail
93 # }}}
94 if msg.choice != msg_expected:
95 logging.warning("%s: unexpected %s message", _id, msg.choice)
96 break
97 if msg_expected == "text":
98 try:
99 await msg_receiver(msg.value, peer_name)
100 except ValueError as err:
101 logging.warning("%s: %s", err)
102 break
103 # Process Handshake message {{{
104 elif msg_expected == "handshake":
105 logging.info("%s: got %s message", _id, msg_expected)
106 msg_handshake = msg.value
107 peer_name = str(msg_handshake["peerName"])
108 if peer_name not in THEIR_NAMES:
109 logging.warning("unknown peer name: %s", peer_name)
110 break
111 writer.write(Msg(("handshake", MsgHandshake((
112 ("peerName", OUR_NAME),
113 )))).encode())
114 await writer.drain()
115 logging.info("%s: session established: %s", _id, peer_name)
116 peer_alive = PEER_ALIVES.pop(peer_name, None)
117 if peer_alive is not None:
118 peer_alive.close()
119 await IN_QUEUES[peer_name].put(None)
120 PEER_ALIVES[peer_name] = writer
121 asyncio.ensure_future(msg_sender(peer_name, writer))
122 msg_expected = "text"
123 # }}}
124 logging.info("%s: disconnecting", _id)
125 if msg_expected == "text":
126 IN_QUEUES[peer_name].put(None)
127 writer.close()
Безопасный протокол
Пришло время обезопасить наше общение. Что же мы подразумеваем под безопасностью и что хотим:
- конфиденциальность передаваемых сообщений;
- аутентичность и целостность передаваемых сообщений — их изменение должно быть обнаружено;
- защита от атак перепроигрывания (replay attack) — факт пропажи или повтора сообщений должен быть обнаружен (и мы решаем обрывать соединение);
- идентификация и аутентификация собеседников по заранее вбитым публичным ключам — мы уже решили ранее, что делаем friend-to-friend сеть. Только после аутентификации мы поймём с кем общаемся;
- наличие perfect forward secrecy свойства (PFS) — компрометация нашего долгоживущего ключа подписи не должна приводить к возможности чтения всей предыдущей переписки. Запись перехваченного трафика становится бесполезной;
- действительность/валидность сообщений (транспортных и рукопожатия) только в пределах одной TCP-сессии. Вставка корректно подписанных/аутентифицированных сообщений из другой сессии (даже с этим же собеседником) не должна быть возможной;
- пассивный наблюдатель не должен видеть ни идентификаторов пользователей, ни передаваемых долгоживущих публичных ключей, ни хэшей от них. Некая анонимность от пассивного наблюдателя.
Удивительно, но этот минимум практически все хотят иметь в любом протоколе рукопожатия, и крайне мало из перечисленного в итоге выполняется для «доморощенных» протоколов. Вот и сейчас не будем изобретать нового. Я бы однозначно рекомендовал использовать Noise framework для построения протоколов, но выберем что-то попроще.
Наиболее популярны два протокола:
- TLS — сложнейший протокол с длинной историей багов, косяков, уязвимостей, плохой продуманности, сложности и недочётов (впрочем, к TLS 1.3 это мало относится). Но не рассматриваем его из-за переусложнённости.
- IPsec с IKE — не имеют серьёзных криптографических проблем, хотя тоже не просты. Если почитать про IKEv1 и IKEv2, то их истоком являются STS, ISO/IEC IS 9798–3 и SIGMA (SIGn-and-MAc) протоколы — достаточно простыми для реализации за один вечер.
Чем SIGMA, как последнее звено развития STS/ISO протоколов, хорош? Он удовлетворяет всем нашим требованиям (в том числе «скрытия» идентификаторов собеседников), не имеет известных криптографических проблем. Он минималистичен — удаление хотя бы одного элемента из сообщения протокола приведёт к его небезопасности.
Давайте пройдёмся от простейшего доморощенного протокола до SIGMA. Самая базовая интересующая нас операция это согласование ключей: функция, на выходе которой оба участника получат одно и то же значение, которое можно будет использовать в качестве симметричного ключа. Не вдаваясь в подробности: каждая из сторон генерирует эфемерную (использующуюся только в пределах одной сессии) ключевую пару (публичный и приватный ключи), обмениваются публичными ключами, вызывают функцию согласования, на вход которой передают свой приватный ключ и публичный ключ собеседника.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔════════════════════╗ │───────────────>│ ║PrvA, PubA = DHgen()║ │ │ ╚════════════════════╝ │ IdB, PubB │ ╔════════════════════╗ │<───────────────│ ║PrvB, PubB = DHgen()║ │ │ ╚════════════════════╝ ────┐ ╔═══════╧════════════╗ │ ║Key = DH(PrvA, PubB)║ <───┘ ╚═══════╤════════════╝ │ │ │ │
Любой может встрять-по-середине и заменить публичные ключи своими собственными — в данном протоколе нет аутентификации собеседников. Добавим подпись долгоживущими ключами.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │IdA, PubA, sign(SignPrvA, (PubA)) │ ╔═══════════════════════════╗ │─────────────────────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════╝ │IdB, PubB, sign(SignPrvB, (PubB)) │ ╔═══════════════════════════╗ │<─────────────────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════════════════════╝ ────┐ ╔═════════════════════╗ │ │ ║verify(SignPubB, ...)║ │ <───┘ ║Key = DH(PrvA, PubB) ║ │ │ ╚═════════════════════╝ │ │ │
Такая подпись не подойдёт, так как она не привязана к конкретной сессии. Такие сообщения «подойдут» и для сессий с другими участниками. Подписываться должен весь контекст. Это вынуждает также добавить посылку ещё одного сообщения от A.
Кроме того, критично добавить под подпись и собственный идентификатор, так как, в противном случае, мы можем подменить IdXXX и переподписать сообщение ключом другого известным собеседника. Для предотвращения reflection атак, необходимо чтобы элементы под подписью находились в чётко заданных местах по своему смыслу: если A подписывает (PubA, PubB), то B должен подписывать (PubB, PubA). Это ещё и говорит о важности выбора структуры и формата сериализованных данных. Например, множества в ASN.1 DER кодировании сортируются: SET OF (PubA, PubB) будет идентичен SET OF (PubB, PubA).
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔═══════════════════════════╗ │────────────────────────────────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════╝ │IdB, PubB, sign(SignPrvB, (IdB, PubA, PubB)) │ ╔═══════════════════════════╗ │<────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════════════════════╝ │ sign(SignPrvA, (IdA, PubB, PubA)) │ ╔═════════════════════╗ │────────────────────────────────────────────>│ ║verify(SignPubB, ...)║ │ │ ║Key = DH(PrvA, PubB) ║ │ │ ╚═════════════════════╝ │ │
Однако мы всё ещё не «доказали» что выработали одинаковый общий ключ для этой сессии. В принципе, можно обойтись и без этого шага — первое же транспортное сообщение будет невалидным, но мы хотим, чтобы когда рукопожатие завершилось, то были бы уверены что всё действительно согласовано. На данный момент у нас на руках ISO/IEC IS 9798–3 протокол.
Мы могли бы подписывать и сам выработанный ключ. Это опасно, так как не исключено, что в используемом алгоритме подписи могут быть утечки (пускай биты-на-подпись, но всё же утечки). Можно подписывать хэш от выработанного ключа, но утечка даже хэша от выработанного ключа может иметь ценность при brute-force атаке на функцию выработки. SIGMA использует MAC функцию, аутентифицирующую идентификатор отправителя.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA │ ╔═══════════════════════════╗ │─────────────────────────────────────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════╝ │IdB, PubB, sign(SignPrvB, (PubA, PubB)), MAC(IdB) │ ╔═══════════════════════════╗ │<─────────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════════════════════╝ │ │ ╔═════════════════════╗ │ sign(SignPrvA, (PubB, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────────────────────────────────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │
В качестве оптимизации, некоторые могут захотеть переиспользовать свои эфемерные ключи (что, конечно, плачевно для PFS). Например, мы сгенерировали ключевую пару, попытались подключиться, но TCP не был доступен или оборвался где-то на середине протокола. Жалко тратить потраченную энтропию и ресурсы процессора на новую пару. Поэтому введём, так называемый, cookie — псевдослучайное значение, которое защитит от возможных случайных replay атак при повторном использовании эфемерных публичных ключей. Из-за binding-а между cookie и эфемерным публичным ключом, публичный ключ противоположного участника можно убрать из подписи за ненадобностью.
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ IdA, PubA, CookieA │ ╔═══════════════════════════╗ │──────────────────────────────────────────────────────────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════╝ │IdB, PubB, CookieB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB) │ ╔═══════════════════════════╗ │<──────────────────────────────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════════════════════╝ │ │ ╔═════════════════════╗ │ sign(SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA) │ ║Key = DH(PrvA, PubB) ║ │──────────────────────────────────────────────────────────────────────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │
Наконец, мы хотим получить приватность наших идентификаторов собеседников от пассивного наблюдателя. Для этого SIGMA предлагает сначала обменяться эфемерными ключами, выработать общий ключ, на котором зашифровать аутентифицирующие и идентифицирующие сообщения. SIGMA описывает два варианта:
- SIGMA-I — защищает инициатора от активных атак, ответчика от пассивных: инициатор аутентифицирует ответчика и если что-то не сошлось, то свою идентификацию он не выдаёт. Ответчик же выдаёт свою идентификацию если с ним начать активный протокол. Пассивный наблюдатель ничего не узнает;
SIGMA-R — защищает ответчика от активных атак, инициатора от пассивных. Всё с точностью до наоборот, но в этом протоколе уже четыре сообщения рукопожатия передаётся.Выбираем SIGMA-I как более похожий на то, что мы ожидаем от клиент-серверных привычных вещей: клиента узнает только аутентифицированный сервер, а сервер и так знают все. Плюс он проще в реализации из-за меньшего количества сообщений рукопожатия. Всё что мы вносим в протокол, так это шифрование части сообщения и перенос идентификатора A в шифрованную часть последнего сообщения:
┌─────┐ ┌─────┐ │PeerA│ │PeerB│ └──┬──┘ └──┬──┘ │ PubA, CookieA │ ╔═══════════════════════════╗ │─────────────────────────────────────────────────────────────────────────────>│ ║SignPrvA, SignPubA = load()║ │ │ ║PrvA, PubA = DHgen() ║ │ │ ╚═══════════════════════════╝ │PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═══════════════════════════╗ │<─────────────────────────────────────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║ │ │ ║PrvB, PubB = DHgen() ║ │ │ ╚═══════════════════════════╝ │ │ ╔═════════════════════╗ │ Enc((IdA, sign(SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║ │─────────────────────────────────────────────────────────────────────────────>│ ║verify(Key, IdB) ║ │ │ ║verify(SignPubB, ...)║ │ │ ╚═════════════════════╝ │ │
- Для подписи используется ГОСТ Р 34.10–2012 алгоритм с 256-бит ключами.
- Для выработки общего ключа используется 34.10–2012 VKO.
- В качестве MAC используется CMAC. Технически это особый режим работы блочного шифра, описанный в ГОСТ Р 34.13–2015. В качестве функции шифрования для этого режима — Кузнечик (34.12–2015).
- В качестве идентификатора собеседника используется хэш от его публичного ключа. В качестве хэша применяется Стрибог-256 (34.11–2012 256 бит).
После рукопожатия у нас будет согласован общий ключ. Его мы можем использовать для аутентифицированного шифрования транспортных сообщений. Эта часть совсем простая и в ней сложно ошибиться: инкрементируем счётчик сообщений, шифруем сообщение, аутентифицируем (MAC) счётчик и шифротекст, отправляем. При приёме сообщения проверяем что счётчик имеет ожидаемое значение, аутентифицируем шифротекст с счётчиком, дешифруем. Каким ключом шифровать сообщения рукопожатия, транспортные, каким аутентифицировать? Использовать один ключ для всех этих задач опасно и неразумно. Необходимо вырабатывать ключи, используя специализированные функции KDF (key derivation function). Опять же, не будем мудрить и что-то изобретать: HKDF давно известна, хорошо исследована и не имеет известных проблем. К сожалению, в родной библиотеке Python нет этой функции, поэтому используем hkdf пакет. HKDF внутри использует HMAC, который, в свою очередь, использует хэш-функцию. Пример реализации на Python на странице Wikipedia занимает считанные строки кода. Как и в случае с 34.10–2012, в качестве хэш-функции будем использовать Стрибог-256. Выход нашей функции согласования ключей будет называться сессионным ключом, из которого будут вырабатываться недостающие симметричные:
kdf = Hkdf(None, key_session, hash=GOST34112012256) kdf.expand(b"handshake1-mac-identity") kdf.expand(b"handshake1-enc") kdf.expand(b"handshake1-mac") kdf.expand(b"handshake2-mac-identity") kdf.expand(b"handshake2-enc") kdf.expand(b"handshake2-mac") kdf.expand(b"transport-initiator-enc") kdf.expand(b"transport-initiator-mac") kdf.expand(b"transport-responder-enc") kdf.expand(b"transport-responder-mac")
Структуры/схемы
Рассмотрим какие же теперь ASN.1 структуры у нас получились для передачи всех этих данных:class Msg(Choice): schema = (( ("text", MsgText()), ("handshake0", MsgHandshake0(expl=tag_ctxc(0))), ("handshake1", MsgHandshake1(expl=tag_ctxc(1))), ("handshake2", MsgHandshake2(expl=tag_ctxc(2))), )) class MsgText(Sequence): schema = (( ("payload", MsgTextPayload()), ("payloadMac", MAC()), )) class MsgTextPayload(Sequence): schema = (( ("nonce", Integer(bounds=(0, float("+inf")))), ("ciphertext", OctetString(bounds=(1, MaxTextLen))), )) class MsgHandshake0(Sequence): schema = (( ("cookieInitiator", Cookie()), ("pubKeyInitiator", PubKey()), )) class MsgHandshake1(Sequence): schema = (( ("cookieResponder", Cookie()), ("pubKeyResponder", PubKey()), ("ukm", OctetString(bounds=(8, 8))), ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class MsgHandshake2(Sequence): schema = (( ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class HandshakeTBE(Sequence): schema = (( ("identity", OctetString(bounds=(32, 32))), ("signature", OctetString(bounds=(64, 64))), ("identityMac", MAC()), )) class HandshakeTBS(Sequence): schema = (( ("cookieTheir", Cookie()), ("cookieOur", Cookie()), ("pubKeyOur", PubKey()), )) class Cookie(OctetString): bounds = (16, 16) class PubKey(OctetString): bounds = (64, 64) class MAC(OctetString): bounds = (16, 16)
HandshakeTBS — то, что будет подписываться (to be signed). HandshakeTBE — то, что будет зашифровано (to be encrypted). Обращаю внимание на поле ukm в MsgHandshake1. 34.10 VKO, для ещё большей рандомизации вырабатываемых ключей, включает параметр UKM (user keying material) — просто дополнительная энтропия.Добавление криптографии в код
Рассмотрим лишь только внесённые к оригинальному коду изменения, так как каркас остался прежним (на самом деле, сначала была написана окончательная реализация, а потом из неё выпиливалась вся криптография).Так как аутентификация и идентификация собеседников будет проводится по публичным ключам, то теперь их надо где-то долговременно хранить. Для простоты используем JSON такого вида:
{ "our": { "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98", "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1" }, "their": { "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce", "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a" } }
our — наша ключевая пара, шестнадцатеричные приватный и публичные ключи. their — имена собеседников и их публичные ключи. Изменим аргументы командной строки и добавим постобработку JSON данных:from pygost import gost3410 from pygost.gost34112012256 import GOST34112012256 CURVE = gost3410.GOST3410Curve( *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"] ) parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--keys-gen", action="store_true", help="Generate JSON with our new keypair", ) parser.add_argument( "--keys", default="keys.json", required=False, help="JSON with our and their keys", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() if args.keys_gen: prv_raw = urandom(32) pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw)) pub_raw = gost3410.pub_marshal(pub) print(json.dumps({ "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)}, "their": {}, })) exit(0) # Parse and unmarshal our and their keys {{{ with open(args.keys, "rb") as fd: _keys = json.loads(fd.read().decode("utf-8")) KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"])) _pub = hexdec(_keys["our"]["pub"]) KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub) KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest()) for peer_name, pub_raw in _keys["their"].items(): _pub = hexdec(pub_raw) KEYS[GOST34112012256(_pub).digest()] = { "name": peer_name, "pub": gost3410.pub_unmarshal(_pub), } # }}}
Приватный ключ 34.10 алгоритма — случайное число. Размером 256-бит для 256-бит эллиптических кривых. PyGOST работает не с набором байт, а с большими числами, поэтому наш приватный ключ (urandom (32)) необходимо преобразовать в число, используя gost3410.prv_unmarshal (). Публичный ключ детерминировано вычисляется из приватного, используя gost3410.public_key (). Публичный ключ 34.10 — два больших числа, которые тоже нужно преобразовать в байтовую последовательность для удобства хранения и передачи, используя gost3410.pub_marshal ().После чтения JSON файла, публичные ключи, соответственно, нужно преобразовать назад, используя gost3410.pub_unmarshal (). Так как нам будут приходить идентификаторы собеседников в виде хэша от публичного ключа, то их можно сразу же заранее вычислить и поместить в словарь для быстрого поиска. Стрибог-256 хэш это gost34112012256.GOST34112012256(), полностью удовлетворяющий hashlib интерфейсу хэш-функций.
Как изменилась корутина инициатора? Всё, как по схеме рукопожатия: генерируем cookie (128-бит вполне предостаточно), эфемерную ключевую пару 34.10, которая будет использоваться для VKO функции согласования ключей.
395 async def initiator(host, port): 396 _id = repr((host, port)) 397 logging.info("%s: dialing", _id) 398 reader, writer = await asyncio.open_connection(host, port) 399 # Generate our ephemeral public key and cookie, send Handshake 0 message {{{ 400 cookie_our = Cookie(urandom(16)) 401 prv = gost3410.prv_unmarshal(urandom(32)) 402 pub_our = gost3410.public_key(CURVE, prv) 403 pub_our_raw = PubKey(gost3410.pub_marshal(pub_our)) 404 writer.write(Msg(("handshake0", MsgHandshake0(( 405 ("cookieInitiator", cookie_our), 406 ("pubKeyInitiator", pub_our_raw), 407 )))).encode()) 408 # }}} 409 await writer.drain()
- ждём ответа и декодируем пришедшее Msg сообщение;
- убеждаемся что получили handshake1;
- декодируем эфемерный публичный ключ противоположной стороны и вычисляем сессионный ключ;
- вырабатываем симметричные ключи необходимые для обработки TBE части сообщения.
423 logging.info("%s: got %s message", _id, msg.choice) 424 if msg.choice != "handshake1": 425 logging.warning("%s: unexpected message, disconnecting", _id) 426 writer.close() 427 return 428 # }}} 429 msg_handshake1 = msg.value 430 # Validate Handshake message {{{ 431 cookie_their = msg_handshake1["cookieResponder"] 432 pub_their_raw = msg_handshake1["pubKeyResponder"] 433 pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw)) 434 ukm_raw = bytes(msg_handshake1["ukm"]) 435 ukm = ukm_unmarshal(ukm_raw) 436 key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001) 437 kdf = Hkdf(None, key_session, hash=GOST34112012256) 438 key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity") 439 key_handshake1_enc = kdf.expand(b"handshake1-enc") 440 key_handshake1_mac = kdf.expand(b"handshake1-mac")
UKM это 64-бит число (urandom (8)), которое тоже требуе