Обзор библиотеки Actix в Rust

77197fd2bb8ef232e881063bb7b6645d.png

Привет, Хабр!

Actix появился на свет благодаря Николаю Киму, также известного в сообществе под ником fafhrd91. Николай создал экосистему, которая позволяет строить масштабируемые, надежные веб-приложения и микросервисы. Actix основан на модели акторов, концепции, которая была впервые предложена в 1973 году для симуляции активных компонентов в распределенных системах.

Actix требует Rust версии 1.39 или выше. Можно легко добавить Actix юзая Cargo. Для этого достаточно добавить зависимости Actix-web в файл Cargo.toml вашего проекта:

[dependencies]
actix-web = "4.0"

После добавления юзаем cargo build, и карго автоматом скомпилирует Actix Web вместе с необходимыми зависимостями.

Основные возможности

Основные элементы Actix — это акторы, система и арбитр.

Актор — это объект, который инкапсулирует состояние и поведение. Он взаимодействует с другими акторами исключительно посредством асинхронных сообщений:

use actix::prelude::*;

struct MyActor;
impl Actor for MyActor {
    type Context = Context;
}

impl MyActor {
    fn new() -> Self {
        MyActor
    }
}

Каждый актор в Actix должен реализовать трейт Actor, определяющий контекст, в котором актор исполняется.

System в Actix — это контейнер, который запускает и управляет акторами.

fn main() {
    let system = System::new();
    system.block_on(async {
        let addr = MyActor::new().start();
        // отправка сообщений актору и другие операции
    });
}

Arbiter — это сущность, управляющая пулом потоков для выполнения асинхронных задач. Каждый Arbiter может запускать несколько акторов, обеспечивая параллельное выполнение их задач:

fn main() {
    let system = System::new();
    system.block_on(async {
        let arbiter = Arbiter::new();
        arbiter.spawn(async {
            // задачи, выполняемые в контексте нового Arbiter
        });
    });
}

Как это все взаимодействует:

use actix::prelude::*;

struct Ping;
impl Message for Ping {
    type Result = String;
}

struct PongActor;
impl Actor for PongActor {
    type Context = Context;
}

impl Handler for PongActor {
    type Result = String;

    fn handle(&mut self, _msg: Ping, _ctx: &mut Context) -> Self::Result {
        "Pong".to_string()
    }
}

fn main() {
    let system = System::new();
    system.block_on(async {
        let pong_actor_address = PongActor.start();
        let result = pong_actor_address.send(Ping).await.unwrap();
        println!("Received: {}", result);
    });
}

PongActor обрабатывает сообщения типа Ping и отвечает сообщением «Pong». Cоздаем актора, отправляем ему сообщение и асинхронно ожидаем ответа.

Обработчики могут быть простыми, возвращая статические данные, или сложными, выполняя асинхронные операции, такие как запросы к базам данных или внешним API. В Actix Web для объявления обработчика используется макрос #[get("/path")], #[post("/path")] и так далее, в зависимости от HTTP-метода, который обработчик должен обрабатывать:

use actix_web::{web, App, HttpResponse, HttpServer, Responder};

async fn greet(req: web::HttpRequest) -> impl Responder {
    let name = req.match_info().get("name").unwrap_or("World");
    HttpResponse::Ok().body(format!("Hello, {}!", name))
}

Мидлвари в Actix Web могут модифицировать запросы перед тем, как они достигнут обработчика, или изменять ответы после того, как обработчик их сгенерил, но до того, как они отправлены клиенту:

use actix_web::{middleware, App, HttpServer};

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .wrap(middleware::Logger::default()) // мидлвари логирования
            .service(/* обработчики */)
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Асинхронность в Actix

В Actix каждый актор исполняется асинхронно, имея возможность обрабатывать сообщения параллельно с другими акторами. Акторы могут отправлять, получать и обрабатывать сообщения асинхронно, используя async/await Rust:

use actix::prelude::*;

struct MyAsyncActor;
impl Actor for MyAsyncActor {
    type Context = Context;
}

#[derive(Message)]
#[rtype(result = "String")]
struct MyMessage;

impl Handler for MyAsyncActor {
    type Result = ResponseFuture;

    fn handle(&mut self, _msg: MyMessage, _ctx: &mut Context) -> Self::Result {
        Box::pin(async {
            // какая-нибудь длительная асинхронная операция
            actix_rt::time::sleep(std::time::Duration::from_secs(1)).await;
            "Hello from async actor!".to_string()
        })
    }
}

MyAsyncActor асинхронно обрабатывает сообщение MyMessage, имитируя длительную операцию с использованием sleep. Возвращаемый тип ResponseFuture показывает то что все будет выполнено асинхорнно

С Actix можно асинхронного взаимодействовать между акторами через отправку сообщений. Акторы могут использовать метод .send() для асинхронной отправки сообщений другим акторам и ожидать ответа с помощью await:

#[actix_rt::main]
async fn main() {
    let addr = MyAsyncActor.start();

    let res = addr.send(MyMessage).await;
    match res {
        Ok(result) => println!("Received response: {}", result),
        Err(e) => println!("Encountered an error: {}", e),
    }
}

Отправляем сообщение MyMessage актору MyAsyncActor и асинхронно ожидаем ответ.

Actix поддерживает асинхронную обработку стримов:

use actix::prelude::*;
use futures::stream::{self, StreamExt};

struct StreamActor;
impl Actor for StreamActor {
    type Context = Context;
}

impl StreamHandler for StreamActor {
    fn handle(&mut self, item: i32, _ctx: &mut Context) {
        println!("Received: {}", item);
    }
}

#[actix_rt::main]
async fn main() {
    let addr = StreamActor.start();

    // создание простого стрима
    let stream = stream::iter(vec![1, 2, 3]);
    // отправка стрима актору
    addr.add_stream(stream);
}

StreamActor подписывается на стрим целые числа и асинхронно обрабатыает каждое поступающее значение.

Прочие возможности

Создадим простейший веб-сервер, который будет отвечать на запросы по корневому пути (/):

use actix_web::{web, App, HttpResponse, HttpServer, Responder};

async fn greet() -> impl Responder {
    HttpResponse::Ok().body("Hello, Habr!")
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/", web::get().to(greet))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Создаем HTTP-сервер, который слушает на 127.0.0.1:8080 и использует функцию greet для обработки GET-запросов по адресу /. Функция greet возвращает простое приветствие.

Создадим обработчик, который принимает имя пользователя через путь запроса и возвращает персонализированное приветствие:

use actix_web::{web, App, HttpResponse, HttpServer, Responder};

async fn greet_user(name: web::Path) -> impl Responder {
    HttpResponse::Ok().body(format!("Zdarova, {}!", name))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/greet/{name}", web::get().to(greet_user))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Используем web::Path для извлечения параметра name из пути URL и возвращаем пользователю персонализированное сообщение.

Для работы с базами данных в Actix-web часто юзает асинхронные ORM, например, Diesel или sqlx. Используемsqlx для асинхронного взаимодействия с бдшкой PostgreSQL:

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use sqlx::postgres::PgPoolOptions;
use std::env;

async fn get_user(pool: web::Data) -> impl Responder {
    let row: (String,) = sqlx::query_as("SELECT name FROM users WHERE id = $1")
        .bind(1)
        .fetch_one(&**pool)
        .await
        .unwrap();

    HttpResponse::Ok().body(format!("Hello, {}!", row.0))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    let pool = PgPoolOptions::new()
        .connect(&database_url)
        .await
        .expect("Failed to create pool.");

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(pool.clone()))
            .route("/user", web::get().to(get_user))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Создался пул соединений к бд PostgreSQL и он используется для его для асинхронного выполнения запроса к бд в обработчике get_user.

Можно интегрироваться с API. Например с reqwest, можно асинхронно отправлять HTTP-запросы к внешним сервисам:

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use reqwest::Client;

async fn fetch_data() -> impl Responder {
    let client = Client::new();
    let res = client.get("https://api.example.com/data")
        .send()
        .await
        .unwrap()
        .text()
        .await
        .unwrap();

    HttpResponse::Ok().body(res)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/fetch", web::get().to(fetch_data))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Функция fetch_data использует reqwest для асинхронного получения данных от внешнего API и возвращает полученный ответ клиенту.

Для тех, кто желает узнать больше — Actix GitHub.

А про практические инструменты и другие ЯП лучше всего узнавать от практикующих экспертов, например, в рамках онлайн-курсов.

© Habrahabr.ru