Асинхронная работа с файловой системой в Vert.x
Vert.x — это экосистема для создания реактивных приложений на JVM, которые могут масштабироваться и обрабатывать огромные объемы данных в реальном времени. Это полиглотная платформа, поддерживающая не только Java, но и Kotlin, Groovy, Scala, и еще js. В контексте статьи работать будем на java
Одна из фич vert.x — это набор абстракций и API для асинхронной работы с сетью, файловой системой и другими ресурсами. С его помощью можно легко создавать масштабируемые веб-приложения, микросервисы, сетевые утилиты и многое другое.
Файловая система
В Vert.x вся работа с файловой системой осуществляется через класс FileSystem
.
Для создания файла можно использовать метод createFile
, который асинхронно создает новый файл:
FileSystem fs = vertx.fileSystem();
fs.createFile("myfile.txt", result -> {
if (result.succeeded()) {
System.out.println("Файл успешно создан.");
} else {
System.err.println("Ошибка при создании файла: " + result.cause());
}
});
Аналогично, для удаления файла используется метод delete
:
fs.delete("myfile.txt", result -> {
if (result.succeeded()) {
System.out.println("Файл успешно удален.");
} else {
System.err.println("Ошибка при удалении файла: " + result.cause());
}
});
Для чтения содержимого файла Vert.x есть метод readFile
, который возвращает содержимое файла в виде объекта Buffer
, который может быть легко преобразован в строку или массив байтов:
fs.readFile("myfile.txt", result -> {
if (result.succeeded()) {
Buffer buffer = result.result();
System.out.println("Содержимое файла: " + buffer.toString());
} else {
System.err.println("Ошибка при чтении файла: " + result.cause());
}
});
Для записи в файл используйте метод writeFile
, который принимает путь к файлу и данные в виде Buffer
:
Buffer data = Buffer.buffer("Hello, Vert.x!");
fs.writeFile("myfile.txt", data, result -> {
if (result.succeeded()) {
System.out.println("Данные успешно записаны в файл.");
} else {
System.err.println("Ошибка при записи в файл: " + result.cause());
}
});
AsyncFile
AsyncFile
в Vert.x — это интерфейс для асинхронной работы с файлами. Он позволяет выполнять различные операции с файлами: чтение и запись, асинхронно, т.е. без блокирования потока, в котором выполняется операция.
Основные методы:
read(Buffer buffer, int offset, long position, int length, Handler
: асинхронно читает данные из файла в> handler) Buffer
.offset
указывает на позицию в буфере, куда будут записаны данные,position
— позиция в файле, с которой начнется чтение, аlength
— количество байтов для чтения.write(Buffer buffer, long position, Handler
: асинхронно записывает данные из> handler) Buffer
в файл, начиная с указаннойposition
.flush(Handler
: асинхронно сбрасывает данные из буфера в файл> handler) close(Handler
: асинхронно закрывает файл. После закрытия файла любые попытки его чтения или записи приведут к ошибке.> handler)
Чтения файла асинхронно может выглядеть так:
FileSystem fileSystem = vertx.fileSystem();
String filePath = "path/to/your/file.txt";
fileSystem.open(filePath, new OpenOptions(), openRes -> {
if (openRes.succeeded()) {
AsyncFile file = openRes.result();
Buffer buffer = Buffer.buffer(1024); // размер буфера для чтения
file.read(buffer, 0, 0, 1024, readRes -> {
if (readRes.succeeded()) {
System.out.println("File content: " + buffer.toString());
file.close(voidAsyncResult -> {});
} else {
System.err.println("Error reading file: " + readRes.cause().getMessage());
}
});
} else {
System.err.println("Could not open file: " + openRes.cause().getMessage());
}
});
Асинхронная запись файла:
Buffer data = Buffer.buffer("This is some data to be written to the file.");
fileSystem.open(filePath, new OpenOptions().setWrite(true), openRes -> {
if (openRes.succeeded()) {
AsyncFile file = openRes.result();
file.write(data, 0, writeRes -> {
if (writeRes.succeeded()) {
System.out.println("Data written successfully!");
file.flush(flushRes -> {
if (flushRes.succeeded()) {
System.out.println("Data flushed to disk.");
}
file.close(voidAsyncResult -> {});
});
} else {
System.err.println("Failed to write data: " + writeRes.cause().getMessage());
}
});
} else {
System.err.println("Could not open file: " + openRes.cause().getMessage());
}
});
Для открытия файла асинхронно используется метод open
класса FileSystem
. Метод возвращает Future
, который позволяет работать с файлом асинхронно:
FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";
fs.open(path, new OpenOptions(), result -> {
if (result.succeeded()) {
AsyncFile file = result.result();
// можно работать с AsyncFile
} else {
// обработка ошибки
System.err.println("Error opening file: " + result.cause().getMessage());
}
});
Обработка результатов через Future
В Vert.x Future
используется для представления результата, который будет доступен позже. Он может успешно завершиться с результатом типа T
или завершиться с ошибкой.
Cоздадим простой Future
:
Future future = Future.future(promise -> {
// асинхронная операция
vertx.setTimer(1000, id -> promise.complete("Operation completed"));
});
Для обработки результатов Future
имеет методы onSuccess
, onFailure
, и compose
.
onSuccess
вызывается, если Future
успешно завершен, а onFailure
— в случае ошибки:
future.onSuccess(result -> {
System.out.println("Result: " + result);
}).onFailure(error -> {
System.err.println("Failed: " + error.getMessage());
});
compose
позволяет создать цепочку асинхронных операций, где каждая следующая операция начинается после успешного завершения предыдущей:
Future future1 = Future.future(promise -> {
vertx.setTimer(1000, id -> promise.complete("First operation"));
});
future1.compose(result -> {
// код выполнится после future1
return Future.future(promise -> vertx.setTimer(1000, id -> promise.complete(result + ", second operation")));
}).onSuccess(result -> System.out.println("Result: " + result));
Пример с чтением файла:
FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";
Future readFileFuture = fs.readFile(path);
readFileFuture.onSuccess(buffer -> {
System.out.println("File content: " + buffer.toString());
}).onFailure(Throwable::printStackTrace);
Предположим, есть две асинхронные операции: первая читает файл, а вторая записывает содержимое в другой файл:
String readPath = "path/to/read.txt";
String writePath = "path/to/write.txt";
fs.readFile(readPath).compose(buffer -> {
// после успешного чтения файла записываем содержимое в другой файл
return fs.writeFile(writePath, buffer);
}).onSuccess(v -> System.out.println("File was successfully copied"))
.onFailure(Throwable::printStackTrace);
Future
можно использовать для композиции результатов нескольких асинхронных операций:
Future future1 = Future.succeededFuture("Hello");
Future future2 = Future.succeededFuture("World");
CompositeFuture.all(future1, future2).onSuccess(composite -> {
String result = composite.resultAt(0) + " " + composite.resultAt(1);
System.out.println(result); // "Hello World"
});
Pump и CompositeFuture
Pump
в Vert.x — это утилита, которая помогает передавать данные из ReadStream
в WriteStream
, автоматически управляя обратным давлением:
FileSystem fs = vertx.fileSystem();
String sourcePath = "path/to/source/file";
String destPath = "path/to/dest/file";
fs.open(sourcePath, new OpenOptions().setRead(true), readResult -> {
if (readResult.succeeded()) {
AsyncFile readFile = readResult.result();
fs.open(destPath, new OpenOptions().setWrite(true), writeResult -> {
if (writeResult.succeeded()) {
AsyncFile writeFile = writeResult.result();
Pump pump = Pump.pump(readFile, writeFile);
readFile.endHandler(v -> writeFile.close());
writeFile.endHandler(v -> readFile.close());
pump.start();
} else {
System.err.println("Failed to open destination file: " + writeResult.cause());
}
});
} else {
System.err.println("Failed to open source file: " + readResult.cause());
}
});
Pump.pump(readFile, writeFile)
создает экземпляр Pump
, который автоматически читает из readFile
и записывает в writeFile
, управляя обратным давлением между потоками
CompositeFuture
позволяет группировать несколько асинхронных операций и обрабатывать их результаты как единое целое:
FileSystem fs = vertx.fileSystem();
List futures = new ArrayList<>();
// путь к файлам, которые нужно прочитать
List filePaths = Arrays.asList("path/to/file1", "path/to/file2", "path/to/file3");
for (String path : filePaths) {
Future future = fs.readFile(path).future();
futures.add(future);
}
CompositeFuture.all(futures).onComplete(ar -> {
if (ar.succeeded()) {
for (int i = 0; i < filePaths.size(); i++) {
System.out.println("Content of " + filePaths.get(i) + ": " + ar.result().resultAt(i));
}
} else {
System.err.println("Failed to read one or more files: " + ar.cause());
}
});
CompositeFuture.all(futures)
используется для объединения нескольких асинхронных операций чтения файлов. Результаты каждой операции доступны через ar.result().resultAt(i)
, где i
— индекс операции в исходном списке.
Небольшой пример
Реализуем сервер, который будет поддерживать загрузку и скачивание файлов через HTTP:
Создаем новый Maven-проект и добавляем зависимость Vert.x Web в pom.xml
:
io.vertx
vertx-web
4.2.1
Создаем класс FileServerVerticle
, который расширяет AbstractVerticle
. В методе start
определяем маршруты для загрузки и скачивания файлов:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.file.FileUpload;
public class FileServerVerticle extends AbstractVerticle {
@Override
public void start(Promise startPromise) throws Exception {
Router router = Router.router(vertx);
// директория для сохранения загруженных файлов
String uploadDir = "uploads";
// обработчик для загрузки файлов
router.route("/upload").handler(BodyHandler.create().setUploadsDirectory(uploadDir));
router.post("/upload").handler(this::handleFileUpload);
// обработчик для скачивания файлов
router.get("/download/:fileName").handler(this::handleFileDownload);
vertx.createHttpServer().requestHandler(router).listen(8888, http -> {
if (http.succeeded()) {
startPromise.complete();
System.out.println("HTTP server started on port 8888");
} else {
startPromise.fail(http.cause());
}
});
}
private void handleFileUpload(RoutingContext context) {
for (FileUpload fileUpload : context.fileUploads()) {
System.out.println("Received file: " + fileUpload.fileName());
// логика обработки файла
}
context.response().setStatusCode(200).end("File uploaded");
}
private void handleFileDownload(RoutingContext context) {
String fileName = context.pathParam("fileName");
String fileLocation = "uploads/" + fileName;
context.response().sendFile(fileLocation, result -> {
if (result.failed()) {
context.response().setStatusCode(404).end("File not found");
}
});
}
}
Создаем главный класс для запуска вертикала:
import io.vertx.core.Vertx;
public class Main {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new FileServerVerticle(), res -> {
if (res.succeeded()) {
System.out.println("FileServerVerticle deployed successfully.");
} else {
System.err.println("Failed to deploy FileServerVerticle: " + res.cause());
}
});
}
}
Запускаем сервер с помощью curl:
curl -F "file=@path/to/your/file.txt" http://localhost:8888/upload
В преддверии старта специализации Java-разработчик хочу порекомендовать вам несколько бесплатных вебинаров по следующим темам: