Асинхронная работа с файловой системой в Vert.x

4cf47fbb78d66f079a070b678e06fc90.png

Vert.x — это экосистема для создания реактивных приложений на JVM, которые могут масштабироваться и обрабатывать огромные объемы данных в реальном времени. Это полиглотная платформа, поддерживающая не только Java, но и Kotlin, Groovy, Scala, и еще js. В контексте статьи работать будем на java

Одна из фич vert.x — это набор абстракций и API для асинхронной работы с сетью, файловой системой и другими ресурсами. С его помощью можно легко создавать масштабируемые веб-приложения, микросервисы, сетевые утилиты и многое другое.

Файловая система

В Vert.x вся работа с файловой системой осуществляется через класс FileSystem.

Для создания файла можно использовать метод createFile, который асинхронно создает новый файл:

FileSystem fs = vertx.fileSystem();
fs.createFile("myfile.txt", result -> {
  if (result.succeeded()) {
    System.out.println("Файл успешно создан.");
  } else {
    System.err.println("Ошибка при создании файла: " + result.cause());
  }
});

Аналогично, для удаления файла используется метод delete:

fs.delete("myfile.txt", result -> {
  if (result.succeeded()) {
    System.out.println("Файл успешно удален.");
  } else {
    System.err.println("Ошибка при удалении файла: " + result.cause());
  }
});

Для чтения содержимого файла Vert.x есть метод readFile, который возвращает содержимое файла в виде объекта Buffer, который может быть легко преобразован в строку или массив байтов:

fs.readFile("myfile.txt", result -> {
  if (result.succeeded()) {
    Buffer buffer = result.result();
    System.out.println("Содержимое файла: " + buffer.toString());
  } else {
    System.err.println("Ошибка при чтении файла: " + result.cause());
  }
});

Для записи в файл используйте метод writeFile, который принимает путь к файлу и данные в виде Buffer:

Buffer data = Buffer.buffer("Hello, Vert.x!");
fs.writeFile("myfile.txt", data, result -> {
  if (result.succeeded()) {
    System.out.println("Данные успешно записаны в файл.");
  } else {
    System.err.println("Ошибка при записи в файл: " + result.cause());
  }
});

AsyncFile

AsyncFile в Vert.x — это интерфейс для асинхронной работы с файлами. Он позволяет выполнять различные операции с файлами: чтение и запись, асинхронно, т.е. без блокирования потока, в котором выполняется операция.

Основные методы:

  • read(Buffer buffer, int offset, long position, int length, Handler> handler): асинхронно читает данные из файла в Buffer. offset указывает на позицию в буфере, куда будут записаны данные, position — позиция в файле, с которой начнется чтение, а length — количество байтов для чтения.

  • write(Buffer buffer, long position, Handler> handler): асинхронно записывает данные из Buffer в файл, начиная с указанной position.

  • flush(Handler> handler): асинхронно сбрасывает данные из буфера в файл

  • close(Handler> handler): асинхронно закрывает файл. После закрытия файла любые попытки его чтения или записи приведут к ошибке.

Чтения файла асинхронно может выглядеть так:

FileSystem fileSystem = vertx.fileSystem();
String filePath = "path/to/your/file.txt";

fileSystem.open(filePath, new OpenOptions(), openRes -> {
  if (openRes.succeeded()) {
    AsyncFile file = openRes.result();
    Buffer buffer = Buffer.buffer(1024); // размер буфера для чтения
    file.read(buffer, 0, 0, 1024, readRes -> {
      if (readRes.succeeded()) {
        System.out.println("File content: " + buffer.toString());
        file.close(voidAsyncResult -> {});
      } else {
        System.err.println("Error reading file: " + readRes.cause().getMessage());
      }
    });
  } else {
    System.err.println("Could not open file: " + openRes.cause().getMessage());
  }
});

Асинхронная запись файла:

Buffer data = Buffer.buffer("This is some data to be written to the file.");
fileSystem.open(filePath, new OpenOptions().setWrite(true), openRes -> {
  if (openRes.succeeded()) {
    AsyncFile file = openRes.result();
    file.write(data, 0, writeRes -> {
      if (writeRes.succeeded()) {
        System.out.println("Data written successfully!");
        file.flush(flushRes -> {
          if (flushRes.succeeded()) {
            System.out.println("Data flushed to disk.");
          }
          file.close(voidAsyncResult -> {});
        });
      } else {
        System.err.println("Failed to write data: " + writeRes.cause().getMessage());
      }
    });
  } else {
    System.err.println("Could not open file: " + openRes.cause().getMessage());
  }
});

Для открытия файла асинхронно используется метод open класса FileSystem. Метод возвращает Future, который позволяет работать с файлом асинхронно:

FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";

fs.open(path, new OpenOptions(), result -> {
  if (result.succeeded()) {
    AsyncFile file = result.result();
    // можно работать с AsyncFile
  } else {
    // обработка ошибки
    System.err.println("Error opening file: " + result.cause().getMessage());
  }
});

Обработка результатов через Future

В Vert.x Future используется для представления результата, который будет доступен позже. Он может успешно завершиться с результатом типа T или завершиться с ошибкой.

Cоздадим простой Future:

Future future = Future.future(promise -> {
  // асинхронная операция
  vertx.setTimer(1000, id -> promise.complete("Operation completed"));
});

Для обработки результатов Future имеет методы onSuccess, onFailure, и compose.

onSuccess вызывается, если Future успешно завершен, а onFailure — в случае ошибки:

future.onSuccess(result -> {
  System.out.println("Result: " + result);
}).onFailure(error -> {
  System.err.println("Failed: " + error.getMessage());
});

compose позволяет создать цепочку асинхронных операций, где каждая следующая операция начинается после успешного завершения предыдущей:

Future future1 = Future.future(promise -> {
  vertx.setTimer(1000, id -> promise.complete("First operation"));
});

future1.compose(result -> {
  // код выполнится после future1
  return Future.future(promise -> vertx.setTimer(1000, id -> promise.complete(result + ", second operation")));
}).onSuccess(result -> System.out.println("Result: " + result));

Пример с чтением файла:

FileSystem fs = vertx.fileSystem();
String path = "path/to/your/file.txt";

Future readFileFuture = fs.readFile(path);

readFileFuture.onSuccess(buffer -> {
  System.out.println("File content: " + buffer.toString());
}).onFailure(Throwable::printStackTrace);

Предположим, есть две асинхронные операции: первая читает файл, а вторая записывает содержимое в другой файл:

String readPath = "path/to/read.txt";
String writePath = "path/to/write.txt";

fs.readFile(readPath).compose(buffer -> {
  // после успешного чтения файла записываем содержимое в другой файл
  return fs.writeFile(writePath, buffer);
}).onSuccess(v -> System.out.println("File was successfully copied"))
  .onFailure(Throwable::printStackTrace);

Future можно использовать для композиции результатов нескольких асинхронных операций:

Future future1 = Future.succeededFuture("Hello");
Future future2 = Future.succeededFuture("World");

CompositeFuture.all(future1, future2).onSuccess(composite -> {
  String result = composite.resultAt(0) + " " + composite.resultAt(1);
  System.out.println(result); // "Hello World"
});

Pump и CompositeFuture

Pump в Vert.x — это утилита, которая помогает передавать данные из ReadStream в WriteStream, автоматически управляя обратным давлением:

FileSystem fs = vertx.fileSystem();
String sourcePath = "path/to/source/file";
String destPath = "path/to/dest/file";

fs.open(sourcePath, new OpenOptions().setRead(true), readResult -> {
  if (readResult.succeeded()) {
    AsyncFile readFile = readResult.result();
    fs.open(destPath, new OpenOptions().setWrite(true), writeResult -> {
      if (writeResult.succeeded()) {
        AsyncFile writeFile = writeResult.result();
        Pump pump = Pump.pump(readFile, writeFile);
        readFile.endHandler(v -> writeFile.close());
        writeFile.endHandler(v -> readFile.close());
        pump.start();
      } else {
        System.err.println("Failed to open destination file: " + writeResult.cause());
      }
    });
  } else {
    System.err.println("Failed to open source file: " + readResult.cause());
  }
});

Pump.pump(readFile, writeFile) создает экземпляр Pump, который автоматически читает из readFile и записывает в writeFile, управляя обратным давлением между потоками

CompositeFuture позволяет группировать несколько асинхронных операций и обрабатывать их результаты как единое целое:

FileSystem fs = vertx.fileSystem();
List futures = new ArrayList<>();

// путь к файлам, которые нужно прочитать
List filePaths = Arrays.asList("path/to/file1", "path/to/file2", "path/to/file3");

for (String path : filePaths) {
  Future future = fs.readFile(path).future();
  futures.add(future);
}

CompositeFuture.all(futures).onComplete(ar -> {
  if (ar.succeeded()) {
    for (int i = 0; i < filePaths.size(); i++) {
      System.out.println("Content of " + filePaths.get(i) + ": " + ar.result().resultAt(i));
    }
  } else {
    System.err.println("Failed to read one or more files: " + ar.cause());
  }
});

CompositeFuture.all(futures) используется для объединения нескольких асинхронных операций чтения файлов. Результаты каждой операции доступны через ar.result().resultAt(i), где i — индекс операции в исходном списке.

Небольшой пример

Реализуем сервер, который будет поддерживать загрузку и скачивание файлов через HTTP:

Создаем новый Maven-проект и добавляем зависимость Vert.x Web в pom.xml:


    
        io.vertx
        vertx-web
        4.2.1 
    

Создаем класс FileServerVerticle, который расширяет AbstractVerticle. В методе start определяем маршруты для загрузки и скачивания файлов:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.file.FileUpload;

public class FileServerVerticle extends AbstractVerticle {

  @Override
  public void start(Promise startPromise) throws Exception {
    Router router = Router.router(vertx);

    // директория для сохранения загруженных файлов
    String uploadDir = "uploads";

    // обработчик для загрузки файлов
    router.route("/upload").handler(BodyHandler.create().setUploadsDirectory(uploadDir));
    router.post("/upload").handler(this::handleFileUpload);

    // обработчик для скачивания файлов
    router.get("/download/:fileName").handler(this::handleFileDownload);

    vertx.createHttpServer().requestHandler(router).listen(8888, http -> {
      if (http.succeeded()) {
        startPromise.complete();
        System.out.println("HTTP server started on port 8888");
      } else {
        startPromise.fail(http.cause());
      }
    });
  }

  private void handleFileUpload(RoutingContext context) {
    for (FileUpload fileUpload : context.fileUploads()) {
      System.out.println("Received file: " + fileUpload.fileName());
      // логика обработки файла
    }
    context.response().setStatusCode(200).end("File uploaded");
  }

  private void handleFileDownload(RoutingContext context) {
    String fileName = context.pathParam("fileName");
    String fileLocation = "uploads/" + fileName;
    context.response().sendFile(fileLocation, result -> {
      if (result.failed()) {
        context.response().setStatusCode(404).end("File not found");
      }
    });
  }
}

Создаем главный класс для запуска вертикала:

import io.vertx.core.Vertx;

public class Main {

  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    vertx.deployVerticle(new FileServerVerticle(), res -> {
      if (res.succeeded()) {
        System.out.println("FileServerVerticle deployed successfully.");
      } else {
        System.err.println("Failed to deploy FileServerVerticle: " + res.cause());
      }
    });
  }
}

Запускаем сервер с помощью curl:

curl -F "file=@path/to/your/file.txt" http://localhost:8888/upload

В преддверии старта специализации Java-разработчик хочу порекомендовать вам несколько бесплатных вебинаров по следующим темам:

© Habrahabr.ru