Проблема «error: future cannot be sent between threads safely» при использовании Rust-библиотек Tokio и Actix

Приложения на Rust часто используют асинхронные библиотеки, такие как Tokio и Actix. Эти библиотеки предоставляют инструменты для асинхронного ввода-вывода и параллельных вычислений и т.д. Однако иногда возникают проблемы при совместном использовании разных асинхронных библиотек.

Сегодня я хочу поделиться с вами опытом решения одной из распространенных проблем при работе с библиотеками Tokio и Actix. Конкретно, мы рассмотрим ошибку «error: future cannot be sent between threads safely», которая может возникнуть, когда вы пытаетесь использовать клиент Actix внутри асинхронной функции, запущенной с помощью Tokio. Я расскажу вам, как преодолеть эту проблему.

b8d72b4b502bf1649c2908c062a9e454.png

Исходный код с Actix

Давайте начнем с простого примера кода, который работает только с Actix и не вызывает проблем:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let client = Client::new();
    let url = "http://127.0.0.1:8080/hello";
    let ret =  client.get(url).send().await.unwrap().body().await.unwrap();
    println!("{:?}", ret);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

В этом коде мы создаем HTTP-сервер с помощью Actix и выполняем GET-запрос к нему с использованием клиента Actix. Все работает отлично, но проблемы начинаются, когда мы попытаемся использовать клиент Actix в асинхронной функции, запущенной с помощью Tokio.

Проблема «error: future cannot be sent between threads safely»

Когда мы попытаемся вызвать клиент Actix из Tokio, мы столкнемся с ошибкой «error: future cannot be sent between threads safely». Это происходит потому, что клиент Actix не является Send, что означает, что его нельзя безопасно передавать между потоками.

Вот пример кода, который вызывает эту ошибку:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
    let r = runtime.spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        client.get(url).send().await.unwrap().body().await.unwrap()
    }).await.unwrap();
    println!("{:?}", r);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

Решение проблемы с использованием Oneshot из Tokio

Чтобы решить эту проблему и сделать код безопасным для использования в Tokio, мы можем воспользоваться механизмом Oneshot из Tokio. Этот механизм позволяет нам обернуть результат выполнения клиента Actix и передать его между потоками безопасно.

Вот пример кода, который использует Oneshot для решения проблемы:

use actix_web::{web, App, HttpResponse, HttpServer};
use awc::Client;
#[actix_rt::main]
async fn main()  {
    actix_rt::spawn(async {
        HttpServer::new(|| {
            App::new()
                .service(web::resource("/hello").route(web::get().to(ok)))
        })
            .bind("127.0.0.1:8080")?
            .run()
            .await
    });
    let (sender, receiver) = tokio::sync::oneshot::channel();
    actix_rt::spawn(async move {
        let client = Client::new();
        let url = "http://127.0.0.1:8080/hello";
        let _ = sender.send(client.get(url).send().await.unwrap().body().await.unwrap());
    });
    let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap();
    let r = runtime.spawn(async move {
        receiver.await.unwrap()
    }).await.unwrap();
    println!("{:?}", r);
    std::mem::forget(runtime);
}
async fn ok() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body("OK")
}

Итак, после долгих поисков в Google, изучения вопроса на Stack Overflow и многочисленных попыток получить ответ от ChatGPT, я так и не нашел готового решения своей проблемы. Пришлось включить собственные серые клеточки и пораскинуть мозгами. Надеюсь, что мое решение будет полезным кому-то.

© Habrahabr.ru