Разработка real-time приложений с Python и WebSocket

db0897fd0d7dfcc9b945efe130c1346f.jpg

Real-time приложения, как следует из названия, предоставляют мгновенный обмен данных и информации между сервером и клиентом. Они встречаются повсеместно в различных сферах, начиная от социальных сетей и мессенджеров, и заканчивая финансовыми торговыми платформами, мониторингом систем, онлайн-играми и многими другими областями. Подобные приложения обеспечивают пользовательский опыт, который чрезвычайно близок к реальному времени.

Примеры real-time приложений:

  • Обновление в реальном времени в социальных сетях, как Facebook и Twitter.

  • Мгновенные сообщения в мессенджерах, таких как WhatsApp и Slack.

  • Онлайн-игры, где реакция игроков должна быть немедленной.

  • Финансовые рынки, где каждая миллисекунда имеет значение.

WebSocket — это технология, разработанная для обеспечения real-time коммуникации между клиентами и сервером. В отличие от традиционных HTTP-запросов, WebSocket поддерживает постоянное и двунаправленное соединение между клиентом и сервером, позволяя отправлять данные в обе стороны в любой момент. Это существенно уменьшает задержки и позволяет создавать более реактивные и быстрые приложения.

Преимущества WebSocket:

  • Мгновенные обновления: WebSocket позволяет мгновенно отправлять данные без необходимости постоянно обновлять страницу или делать повторные запросы.

  • Эффективная связь: Соединение WebSocket более эффективно, чем множество HTTP-запросов, что сокращает нагрузку на сервер.

  • Двунаправленная связь: WebSocket поддерживает отправку данных и от сервера, и от клиента, что делает возможным обмен информацией в обе стороны.

WebSocket: основы

WebSocket — это протокол связи, предназначенный для обеспечения real-time обмена данными между клиентом и сервером. В отличие от традиционного HTTP, где клиенты отправляют запросы и серверы отправляют ответы, WebSocket поддерживает постоянное двустороннее соединение. Это означает, что клиент и сервер могут обмениваться данными в режиме реального времени, без постоянных запросов.

WebSocket создает постоянное соединение посредством «рукопожатия» (handshake) при установке связи, а затем обеспечивает канал для отправки и получения данных в обоих направлениях. Этот канал остается открытым, пока клиент или сервер не решит его закрыть. WebSocket значительно сокращает задержки и объем сетевого трафика, делая его идеальным выбором для real-time приложений.

WebSocket имеет несколько существенных преимуществ по сравнению с традиционным HTTP:

  1. Мгновенная реакция: WebSocket позволяет серверу мгновенно отправлять данные клиенту без необходимости ожидания запроса от клиента. Это идеально подходит для обновления в реальном времени, таких как чаты или мониторинг событий.

  2. Сокращение нагрузки на сервер: Постоянное соединение WebSocket сокращает количество сетевого трафика и нагрузку на сервер, поскольку нет необходимости в постоянных HTTP-запросах.

  3. Эффективная связь: WebSocket снижает задержки между отправкой и приемом данных, что делает его идеальным выбором для задач, где каждая миллисекунда важна.

  4. Двунаправленная связь: WebSocket поддерживает отправку данных как от клиента к серверу, так и от сервера к клиенту. Это позволяет создавать интерактивные real-time приложения, где оба участника могут активно взаимодействовать.

WebSocket работает поверх TCP (Transmission Control Protocol) и использует специальный набор заголовков и правил для обмена данными между клиентом и сервером.

f6133133eabfa734bfc640e37ea8d5a1.png

Процесс установки WebSocket соединения включает в себя следующие этапы:

  1. Handshake: Клиент и сервер обмениваются HTTP-заголовками для установки соединения. Этот процесс называется «рукопожатием».

  2. Открытое соединение: После успешного рукопожатия соединение остается открытым, позволяя клиенту и серверу отправлять данные друг другу в режиме реального времени.

  3. Обмен данными: Клиент и сервер могут отправлять сообщения друг другу посредством WebSocket соединения. Эти сообщения могут быть текстовыми или бинарными.

  4. Закрытие соединения: Соединение может быть закрыто по желанию клиента или сервера, или в случае возникновения ошибок.

WebSocket является идеальным инструментом для реализации real-time коммуникации в приложениях. Поскольку соединение остается открытым, сервер может немедленно отправлять обновления клиентам, и наоборот. Это дает возможность создать интерактивные приложения, которые реагируют на события в реальном времени.

WebSocket также поддерживает одновременное соединение множества клиентов с сервером, позволяя создавать чаты, многопользовательские игры и другие real-time приложения.

Основные библиотеки

  1. WebSockets — это простая и эффективная библиотека для работы с WebSocket. Она обеспечивает удобный интерфейс для создания WebSocket серверов и клиентов.

Пример создания WebSocket сервера с использованием библиотеки WebSockets:

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Пример создания WebSocket клиента с использованием библиотеки WebSockets:

import asyncio
import websockets

async def connect_to_server():
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello, Server!")
        response = await websocket.recv()
        print(f"Received: {response}")

asyncio.get_event_loop().run_until_complete(connect_to_server())
  1. Tornado — это многозадачный веб-фреймворк, который также включает поддержку WebSocket. Он обладает высокой производительностью и масштабируемостью.

Пример создания WebSocket сервера с использованием Tornado:

import tornado.web
import tornado.websocket
import tornado.ioloop

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print("WebSocket opened")

    def on_message(self, message):
        self.write_message(f"You said: {message}")

    def on_close(self):
        print("WebSocket closed")

app = tornado.web.Application([(r'/ws', WebSocketHandler)])

if __name__ == "__main__":
    app.listen(8765)
    tornado.ioloop.IOLoop.current().start()

Пример создания WebSocket клиента с использованием Tornado:

import tornado.websocket
import tornado.ioloop

class WebSocketClient(tornado.websocket.WebSocketClientConnection):
    async def send_message(self, message):
        await self.write_message(message)

async def connect_to_server():
    url = "ws://localhost:8765/ws"
    client = await tornado.websocket.websocket_connect(url)
    await client.send_message("Hello, Server!")
    response = await client.read_message()
    print(f"Received: {response}")

if __name__ == "__main__":
    tornado.ioloop.IOLoop.current().run_sync(connect_to_server)
  1. Autobahn предоставляет реализацию WebSocket и WAMP (WebSocket Application Messaging Protocol). Он позволяет легко создавать как WebSocket серверы, так и клиентов.

Пример создания WebSocket сервера с использованием Autobahn:

from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
import asyncio

class MyServerProtocol(WebSocketServerProtocol):
    async def onMessage(self, payload, isBinary):
        if not isBinary:
            message = payload.decode('utf-8')
            print(f"Received message: {message}")
            await self.sendMessage(f"You said: {message}".encode('utf-8'), isBinary=True)

if __name__ == '__main__':
    factory = WebSocketServerFactory()
    factory.protocol = MyServerProtocol

    loop = asyncio.get_event_loop()
    coro = loop.create_server(factory, '0.0.0.0', 8765)
    server = loop.run_until_complete(coro)
    loop.run_forever()
  1. FastAPI поддерживает WebSocket. Он известен своей высокой производительностью и интуитивно понятным API.

Пример создания WebSocket сервера с использованием FastAPI:

from fastapi import FastAPI
from fastapi.websockets import WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"You said: {data}")

Каждая из вышеперечисленных библиотек имеет свои особенности и преимущества. Выбор конкретной библиотеки зависит от ваших потребностей и предпочтений. WebSockets предоставляет простой интерфейс, Tornado обладает высокой производительностью, Autobahn предлагает реализацию WAMP, а FastAPI обеспечивает интеграцию WebSocket с современным веб-фреймворком. Выберите ту, которая лучше соответствует вашему проекту и задачам.

Примеры реализации real-time сервера с Python и WebSocket

  1. Простой WebSocket сервер:
    Создадим базовый WebSocket сервер с FastAPI, который будет принимать и отправлять сообщения клиентам.

    from fastapi import FastAPI, WebSocket
    
    app = FastAPI()
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Вы сказали: {data}")
  2. WebSocket сервер с аутентификацией:
    Добавим аутентификацию к WebSocket серверу, будем использовать зависимости FastAPI для проверки правильности аутентификации клиентов.

    from fastapi import FastAPI, Depends, HTTPException, WebSocket
    from fastapi.security import APIKeyHeader
    
    app = FastAPI()
    api_key_header = APIKeyHeader(name="api_key")
    
    def authenticate_user(api_key: str = Depends(api_key_header)):
        if api_key != "secureapikey":
            raise HTTPException(status_code=401, detail="Ошибка аутентификации")
        return api_key
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket, api_key: str = Depends(authenticate_user)):
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Вы сказали: {data}")
  3. Real-time чат с WebSocket:

    Cоздадим чат, где пользователи могут отправлять и принимать сообщения в реальном времени:

    from fastapi import FastAPI, WebSocket
    from typing import List
    from fastapi.responses import HTMLResponse
    from fastapi.templating import Jinja2Templates
    
    app = FastAPI()
    
    templates = Jinja2Templates(directory="templates")
    
    # Список подключенных клиентов
    connected_clients = []
    
    # Очередь сообщений
    message_queue = []
    
    # WebSocket для веб-интерфейса чата
    @app.websocket("/ws/{username}")
    async def websocket_endpoint(websocket: WebSocket, username: str):
        await websocket.accept()
        # Добавляем клиента в список подключенных
        connected_clients.append({"websocket": websocket, "username": username})
        # Приветственное сообщение для нового клиента
        welcome_message = f"Привет, {username}! Добро пожаловать в чат Otus!"
        await websocket.send_text(welcome_message)
        
        # Отправляем сообщения из очереди (если они есть)
        for message in message_queue:
            await websocket.send_text(message)
        
        try:
            while True:
                data = await websocket.receive_text()
                message = f"{username}: {data}"
                # Добавляем сообщение в очередь
                message_queue.append(message)
                # Отправляем сообщение всем подключенным клиентам
                for client in connected_clients:
                    await client["websocket"].send_text(message)
        except WebSocketDisconnect:
            # Удаляем клиента из списка при отключении
            connected_clients.remove({"websocket": websocket, "username": username})
    
    # Веб-страница для входа в чат
    @app.get("/", response_class=HTMLResponse)
    async def chat_interface(request):
        return templates.TemplateResponse("chat.html", {"request": request})
    
    # HTML-шаблон для веб-интерфейса чата (templates/chat.html)
    # Вам нужно создать директорию "templates" и поместить этот файл туда.
    # Здесь мы предоставляем простой интерфейс для ввода имени пользователя и подключения к чату.
    
    # chat.html
    """
    
    
    
        Real-time Chat
    
    
        

    Real-time Chat

    """ # Запустите приложение с помощью uvicorn, например: # uvicorn filename:app --host 0.0.0.0 --port 8000

    Этот код создает real-time чат, который позволяет пользователям вводить свои имена и отправлять сообщения в реальном времени. Подключенные клиенты могут видеть сообщения друг друга.

  4. WebSocket сервер с базой данных:
    Реализуем WebSocket сервер, который взаимодействует с базой данных. Каждый клиент может отправить данные, и сервер сохранит их в базе:

    from fastapi import FastAPI, WebSocket
    from sqlalchemy import create_engine, Column, Integer, String
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base
    
    SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
    engine = create_engine(SQLALCHEMY_DATABASE_URL)
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    Base = declarative_base()
    
    app = FastAPI()
    
    class Item(Base):
        __tablename__ = "items"
        id = Column(Integer, primary_key=True, index=True)
        name = Column(String, index=True)
    
    Base.metadata.create_all(bind=engine)
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            item = Item(name=data)
            db = SessionLocal()
            db.add(item)
            db.commit()
            db.refresh(item)
            db.close()
            await websocket.send_text(f"Сохранено в базе: {data}")
    
  5. WebSocket сервер с асинхронными задачами:
    Добавим асинхронные задачи в WebSocket сервер. В этом примере мы будем использовать FastAPI’s background tasks для обработки данных асинхронно:

    from fastapi import FastAPI, WebSocket, BackgroundTasks
    
    app = FastAPI()
    
    def process_data(data: str):
        # Ваша асинхронная логика обработки данных
        pass
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket, background_tasks: BackgroundTasks):
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            background_tasks.add_task(process_data, data)
            await websocket.send_text(f"Принято: {data}")
    
  6. WebSocket сервер с публикацией/подпиской:

    В этом примере мы создадим WebSocket сервер с использованием FastAPI и библиотеки websockets для публикации и подписки на каналы. Клиенты смогут подписываться на каналы и получать обновления от других клиентов:

    from fastapi import FastAPI, WebSocket
    import asyncio
    import websockets
    
    app = FastAPI()
    
    # Хранилище подключенных клиентов
    connected_clients = set()
    
    # Функция для отправки сообщения всем подписанным клиентам
    async def publish_message(message):
        for client in connected_clients:
            await client.send(message)
    
    # WebSocket для подписки на канал
    @app.websocket("/ws/{channel}")
    async def websocket_endpoint(websocket: WebSocket, channel: str):
        await websocket.accept()
        # Добавляем клиента в список подключенных
        connected_clients.add(websocket)
        try:
            while True:
                data = await websocket.receive_text()
                message = f"Клиент в канале {channel}: {data}"
                # Отправляем сообщение всем подписанным клиентам
                await publish_message(message)
        except websockets.exceptions.ConnectionClosedOK:
            # Удалить клиента из списка при закрытии соединения
            connected_clients.remove(websocket)
            await websocket.close()
    
    # Пример клиентской стороны для подписки на канал:
    # var socket = new WebSocket("ws://localhost:8000/ws/my_channel")
    
  7. WebSocket сервер с отправкой уведомлений:

    Создадим WebSocket сервер с использованием FastAPI, который будет отправлять уведомления всем подключенным клиентам. Это может быть полезно для систем уведомлений или мониторинга:

    from fastapi import FastAPI, WebSocket
    import asyncio
    
    app = FastAPI()
    
    # Список подключенных клиентов
    connected_clients = []
    
    # Функция для отправки уведомления всем клиентам
    async def send_notification(message):
        for client in connected_clients:
            await client.send_text(message)
    
    # WebSocket для уведомлений
    @app.websocket("/ws/notifications")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        # Добавляем клиента в список подключенных
        connected_clients.append(websocket)
        try:
            while True:
                data = await websocket.receive_text()
                message = f"Уведомление: {data}"
                # Отправляем уведомление всем подключенным клиентам
                await send_notification(message)
        except WebSocketDisconnect:
            # Удаляем клиента из списка при отключении
            connected_clients.remove(websocket)
    
  8. WebSocket сервер с обработкой исключений:
    В данном примере мы добавим обработку исключений к WebSocket серверу, чтобы учесть возможные ошибки и неполадки:

    from fastapi import FastAPI, WebSocket
    
    app = FastAPI()
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        try:
            await websocket.accept()
            while True:
                data = await websocket.receive_text()
                await websocket.send_text(f"Вы сказали: {data}")
        except Exception as e:
            await websocket.close()
    
  9. WebSocket сервер с аутентификацией JWT:

    В этом примере мы создадим WebSocket сервер с аутентификацией JSON Web Tokens (JWT). Клиенты должны предоставить валидный JWT токен для подключения:

    from fastapi import FastAPI, WebSocket, Depends, HTTPException
    from fastapi.security import OAuth2PasswordBearer
    import jwt
    
    app = FastAPI()
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    # Секретный ключ для подписи и проверки JWT
    SECRET_KEY = "mysecretkey"
    
    # Функция для генерации JWT токена
    def create_jwt_token(data):
        return jwt.encode(data, SECRET_KEY, algorithm="HS256")
    
    # Функция для проверки JWT токена
    def verify_jwt_token(token):
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
            return payload
        except jwt.JWTError:
            return None
    
    # WebSocket с аутентификацией JWT
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket, token: str = Depends(oauth2_scheme)):
        payload = verify_jwt_token(token)
        if not payload:
            raise HTTPException(status_code=401, detail="Ошибка аутентификации")
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Аутентифицированный клиент с ID {payload['sub']} сказал: {data}")
    

    Обратите внимание, что для полноценной аутентификации JWT вам потребуется реализовать логику генерации и проверки JWT токенов в соответствии с вашими потребностями и системой аутентификации.

  1. WebSocket сервер с балансировкой нагрузки:

    WebSocket сервер с балансировкой нагрузки может быть реализован с использованием FastAPI в сочетании с библиотекой uvicorn для запуска нескольких экземпляров сервера и балансировщика нагрузки, такого как Nginx:

    1. Создадим файл с именем main.py для вашего WebSocket сервера:

    from fastapi import FastAPI, WebSocket
    
    app = FastAPI()
    
    # WebSocket для подключения клиентов
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Вы сказали: {data}")
    1. Теперь создадим файл для запуска нескольких экземпляров сервера, например, start_server.py:

    import uvicorn
    
    if __name__ == "__main__":
        uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, workers=4)

    В этом примере мы используем 4 рабочих процесса (workers) для обработки соединений, но вы можете настроить это значение в соответствии с вашими потребностями.

    1. Запускаем WebSocket серверы с помощью uvicorn:

    python start_server.py

    Теперь у нас есть несколько экземпляров WebSocket сервера, работающих на разных портах. Для балансировки нагрузки между ними, вы можете использовать Nginx или другой балансировщик нагрузки. Настройте его так, чтобы он перенаправлял входящие запросы на порты, на которых работают ваши WebSocket серверы.

    Пример конфигурации Nginx:

    http {
        upstream websocket_servers {
            server localhost:8000;
            server localhost:8001;
            server localhost:8002;
            server localhost:8003;
        }
    
        server {
            listen 80;
            server_name your_domain.com;
    
            location /ws {
                proxy_pass http://websocket_servers;
                proxy_http_version 1.1;
                proxy_set_header Upgrade $http_upgrade;
                proxy_set_header Connection "upgrade";
            }
    
            location / {
                proxy_pass http://websocket_servers;
            }
        }
    }
    

    В результате получается WebSocket сервер с балансировкой нагрузки, который может масштабироваться горизонтально по мере необходимости. Клиенты могут подключаться к балансировщику нагрузки, и запросы будут равномерно распределяться между вашими серверами для обработки WebSocket соединений.

Разработка клиентской части

Разработка клиентской части real-time приложения, работающей с WebSocket сервером, играет важную роль в обеспечении интерактивности и реактивности вашего приложения. В этом разделе мы разберем создание WebSocket клиента, его интеграцию с веб-интерфейсом и обработку real-time данных на клиентской стороне, используя библиотеку FastAPI.

Для начала создадим WebSocket клиент с использованием JavaScript и библиотеки WebSocket. Пример простого WebSocket клиента:

// Создаем WebSocket соединение с сервером
const socket = new WebSocket("ws://localhost:8000/ws");

// Обработчик события при открытии соединения
socket.addEventListener("open", (event) => {
    console.log("Соединение установлено");
});

// Обработчик события при получении сообщения от сервера
socket.addEventListener("message", (event) => {
    const message = event.data;
    console.log("Получено сообщение: " + message);
});

// Обработчик события при закрытии соединения
socket.addEventListener("close", (event) => {
    if (event.wasClean) {
        console.log("Соединение закрыто корректно");
    } else {
        console.error("Соединение разорвано");
    }
});

// Обработчик события при возникновении ошибки
socket.addEventListener("error", (event) => {
    console.error("Ошибка соединения: " + event.message);
});

Этот клиент создает WebSocket соединение с сервером, слушает события открытия, приема сообщения, закрытия и ошибки.

Чтобы интегрировать WebSocket клиента с веб-интерфейсом, вы можете использовать фреймворк или библиотеку на ваш выбор, такие как React, Angular, Vue.js или просто встроить код в HTML и JavaScript.

Пример интеграции веб-интерфейса с WebSocket клиентом на простом HTML-странице:




    Real-Time App


    

При получении сообщения от WebSocket сервера, оно отображается на веб-странице. Пользователь может отправлять сообщения через интерфейс.

Обработка real-time данных на клиентской стороне зависит от конкретных требований вашего приложения. Вы можете использовать JavaScript для обновления интерфейса, обработки данных и взаимодействия с пользователем в реальном времени.

Пример обработки данных на клиентской стороне:

socket.addEventListener("message", (event) => {
    const data = JSON.parse(event.data);

    if (data.type === "chatMessage") {
        displayChatMessage(data.message);
    } else if (data.type === "notification") {
        displayNotification(data.message);
    } else {
        console.log("Неизвестный тип сообщения: " + data.type);
    }
});

function displayChatMessage(message) {
    // Отобразить чат-сообщение в интерфейсе
}

function displayNotification(notification) {
    // Отобразить уведомление в интерфейсе
}

Обработка данных может включать в себя обновление чата, отображение уведомлений, обновление графиков, графических элементов и многое другое в зависимости от целей вашего приложения.

Заключение

real-time приложения позволяют создавать интерактивные и отзывчивые приложения, обогащая пользовательский опыт. Python и WebSocket являются хорошими инструментарием для реализации таких приложений.

Статья подготовлена в преддверии старта специализации Python Developer. В рамках запуска курса мои коллеги проведут несколько бесплатных вебинаров, на которые могут зарегистрироваться все желающие:

© Habrahabr.ru