Интеграция Axum с S3-хранилищем
Недавно столкнулся с проблемой подружить веб-фреймворк Axum и библиотеку rust-s3. Собственно, задача сделать 2 эндпойнта:
Разумеется, без временных файлов и без удержания целиком всех данных файла в памяти.
Так как для работы с S3 нужны некоторые служебные объекты (настройки доступа к конкретному bucket), вынесем непосредственную работу в структуру UploadService
:
#[derive(Clone)]
pub struct UploadService {
bucket: Arc
}
Для внедрения зависимости (DI) в обработчик эндпойнта, нам необходимо, чтобы наша структура реализовывала трейт Clone
. Так как сервис будет клонироваться на каждый запрос, обернём s3::Bucket
в Arc
, чтобы клонирование было максимально дешёвым.
Теперь реализуем конструктор экземпляра сервиса:
use s3::{Bucket, Region};
use s3::creds::Credentials;
...
impl UploadService {
pub fn new() -> Self {
let bucket_name = std::env::var("UPLOAD_BUCKET_NAME")
.expect("Expected UPLOAD_BUCKET_NAME environment variable");
let region = Region::Custom {
region: std::env::var("UPLOAD_BUCKET_REGION")
.expect("Expected UPLOAD_BUCKET_REGION environment variable"),
endpoint: std::env::var("UPLOAD_BUCKET_ENDPOINT")
.expect("Expected UPLOAD_BUCKET_ENDPOINT environment variable")
};
let credentials = Credentials::new(
Some(
&std::env::var("UPLOAD_BUCKET_ACCESS_KEY")
.expect("Expected UPLOAD_BUCKET_ACCESS_KEY environment variable")
),
Some(
&std::env::var("UPLOAD_BUCKET_SECRET_KEY")
.expect("Expected UPLOAD_BUCKET_SECRET_KEY environment variable")
),
None,
None,
None
).unwrap();
let bucket = Bucket::new(&bucket_name, region, credentials).unwrap()
.with_path_style();
Self {
bucket: Arc::new(bucket)
}
}
...
Сервис конфигурируется с помощью переменных окружения UPLOAD_BUCKET_NAME
, UPLOAD_BUCKET_REGION
, UPLOAD_BUCKET_ACCESS_KEY
, UPLOAD_BUCKET_SECRET_KEY
и UPLOAD_BUCKET_ENDPOINT
. Последний параметр необходим так как я использую не Amazon S3, а другого S3-совместимого провайдера (Scaleway). При использовании Amazon S3 можно явно задать нужный регион с помощью одного из значений из перечисления s3::Region
(например, s3::Region::UsWest1
), либо воспользоваться s3::Region::from_str
для парсинга региона из строки типа us-west-1
. Кстати, в наборе перечислений региона помимо стандартных регионов Amazon есть ещё Digital Ocean, Wasabi и Yandex.
Теперь самое сложное — функция загрузки файла в хранилище:
use std::sync::{Arc, Mutex};
use std::path::Path;
use std::ffi::OsStr;
use axum::http::StatusCode;
use axum::extract::multipart::Field;
use async_hash::{Sha256, Digest};
use async_compat::CompatExt;
use futures::TryStreamExt;
use uuid::Uuid;
...
pub async fn upload<'a>(&self, field: Field<'a>) -> Result {
let orig_filename = field.file_name()
.unwrap_or("file")
.to_owned();
let mimetype = field.content_type()
.unwrap_or("application/octet-stream")
.to_owned();
let digest = Arc::new(Mutex::new(Sha256::new()));
let mut reader = field
.map_ok(|chunk| {
if let Ok(mut digest) = digest.lock() {
digest.update(&chunk);
}
chunk
})
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read()
.compat();
let tmp_filename = format!("tmp/{}.bin", Uuid::new_v4());
self.bucket.put_object_stream_with_content_type(
&mut reader,
&tmp_filename,
&mimetype
)
.await
.map_err(|err| {
log::error!("S3 upload error: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
})?;
drop(reader); // Release digest borrow
let mut result = Err(StatusCode::INTERNAL_SERVER_ERROR);
if let Some(digest) = Arc::into_inner(digest).and_then(|m| m.into_inner().ok()) {
let digest = hex::encode(digest.finalize());
let ext = Path::new(&orig_filename).extension().and_then(OsStr::to_str);
let mut filename = if let Some(ext) = ext {
format!("{}.{}", digest, ext)
} else {
digest
};
filename.make_ascii_lowercase();
match self.bucket.copy_object_internal(&tmp_filename, &filename).await {
Ok(_) => result = Ok(format!("/uploads/{}", &filename)),
Err(err) => log::error!("S3 copy error: {:?}", err)
}
}
if let Err(err) = self.bucket.delete_object(&tmp_filename).await {
log::error!("S3 delete error: {:?}", err);
}
result
}
...
Функция принимает одно поле из multipart/form-data
запроса (обработка запроса будет рассмотрена ниже), определяет исходное имя файла и mime-тип (если эти данные отсутствуют, используется »file
» и »application/octet-stream
» в качестве значений по умолчанию). Затем данные поля превращаются в AsyncRead
с помощью библиотеки async-compat. При этом наш читатель потока по мере чтения потока вычисляет его SHA256 хеш (пригодится в будущем).
Теперь мы можем загрузить файл в S3-хранилище под временным именем »tmp/
» (UUID генерируется случайным образом). Если в этот момент возникает ошибка, функция возвращает код Internal Server Error.
Мы имеем файл в S3-хранилище и посчитанный SHA256 от его данных. Теперь можно переименовать файл в его окончательное имя (я хочу использовать SHA256 в качестве имени файла, чтобы одинаковые файлы не дублировались в хранилище). Для этого я беру HEX-представление SHA256 и приписываю расширение файла взятое из оригинального имени (если оно там было). Результат приводится к нижнему регистру (на случай если расширение файла было не в нижнем регистре) и далее мы выполняем копирование S3-объекта (так как API S3 не имеет функции переименования). Если копирование успешно, то у нас получается результирующий URL-файла.
Наконец, можно удалить временный объект из S3. Это происходит в любом случае — и если копирование было успешным, и если нет.
Последняя функция нашего сервиса — отдача файла по ссылке (теоретически это можно делегировать веб-серверу, но как минимум удобно иметь эту функцию при локальной разработке, как максимум нам может требоваться реализовать какую-нибудь дополнительную бизнес-логику вроде проверки прав доступа к файлу):
use axum::response::IntoResponse;
use axum::body::StreamBody;
use s3::error::S3Error;
...
pub async fn download(
&self,
filename: &str
) -> Result {
let stream = self.bucket.get_object_stream(filename)
.await
.map_err(|err| match err {
S3Error::HttpFailWithBody(status_code, body) => match status_code {
404 => StatusCode::NOT_FOUND,
_ => {
log::error!(
"S3 download HTTP error with code={} and body={:?}",
status_code,
body
);
StatusCode::INTERNAL_SERVER_ERROR
}
}
err => {
log::error!("S3 download error: {:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
}
})?;
Ok(StreamBody::from(stream.bytes))
}
}
Здесь всё просто — получаем стрим S3-объекта, маппим ошибку отсутствия файла на 404-ую ошибку Axum, а остальные ошибки на 500-ую, возвращаем StreamBody
.
Остаётся реализовать обработчики самих эндпойнтов:
use axum::{Extension, Json};
use axum::extract::{Multipart, Path};
use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::http::header::CACHE_CONTROL;
use axum::response::IntoResponse;
#[derive(Debug, serde::Serialize)]
pub struct UploadResponse {
pub url: String
}
pub async fn upload_file(
Extension(upload_service): Extension,
mut multipart: Multipart
) -> Result {
while let Some(field) = multipart.next_field().await.map_err(|_|
StatusCode::INTERNAL_SERVER_ERROR
)? {
if let Some("upload") = field.name() {
let url = upload_service.upload(field).await?;
return Ok(Json(UploadResponse { url }));
}
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn download_file(
Path(path): Path,
Extension(upload_service): Extension
) -> Result {
let body = upload_service.download(&path).await?;
let headers = HeaderMap::from_iter([
(CACHE_CONTROL, HeaderValue::from_str("max-age=31536000").unwrap()) // One year
]);
Ok((headers, body))
}
Обработчик загрузки загружает по одному файлу за раз, при этом имя поля файла в отправленной форме ожидается «upload». Обработчик скачки файла выставляет срок жизни файла в кеше один год, потому что изменения файла не предполагаются (если файл изменится, он будет иметь другой SHA256 и другое имя).
Последнее, что нам остаётся — создать роутер и запустить сервер:
use std::str::FromStr;
use axum::extract::DefaultBodyLimit;
#[tokio::main]
async fn main() -> Result<(), Box> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let upload_service = UploadService::new();
let router = axum::Router::new()
.route("/uploads", axum::routing::post(upload_file))
.route("/uploads/*path", axum::routing::get(download_file))
.layer(Extension(upload_service))
.layer(DefaultBodyLimit::max(8 * 1024 * 1024));
let address = std::env::var("HOST").expect("Expected HOST environment variable");
let port = std::env::var("PORT").expect("Expected PORT environment variable")
.parse::().expect("PORT environment variable must be an integer");
log::info!("Listening on http://{}:{}/", address, port);
axum::Server::bind(
&std::net::SocketAddr::new(
std::net::IpAddr::from_str(&address).unwrap(),
port
)
).serve(router.into_make_service()).await?;
Ok(())
}
Экземляр UploadService
передаётся через Extension
(механизм DI в Axum), также может быть полезно задать DefaultBodyLimit
, потому что стандартное значение 1 МБ может подходить не для всех ситуаций. Хост и порт для прослушивания получаются из соответствующих переменных окружения.
Вероятно, нам также может требоваться добавить какую-нибудь проверку авторизации в эндпойнт загрузки (а, возможно, и скачки), но это зависит от конкретной функции конкретного сервиса.
Зависимости в Cargo.toml:
[package]
name = "uploader"
version = "0.1.0"
edition = "2021"
[dependencies]
log = "0.4.20"
env_logger = "0.10.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
axum = { version = "0.6.20", features = ["multipart"] }
serde = "1.0.188"
uuid = { version = "1.4.1", features = ["v4"] }
rust-s3 = "0.34.0-rc1"
futures = "0.3.28"
async-compat = "0.2.2"
async-hash = "0.5.1"
hex = "0.4.3"
В качестве бонуса пример браузерного кода на TypeScript выполняющего загрузку файла:
interface UploadResponse {
url: string;
}
async function uploadFile(file: Blob, filename?: string): Promise {
const data = new FormData();
data.append("upload", file, filename);
const response = await fetch("/uploads", {
method: "post",
body: data
});
if (response.status >= 200 && response.status <= 299) {
return await response.json();
} else {
return "error";
}
}
Наш сервис готов.
Полный исходный код проекта на GitHub
Обсудить в моём персональном блоге в Telegram