[Из песочницы] Потоковая передача данных из REST сервиса в MQ очередь

Привет, Хабр!

В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.

Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).

Постановка задачи


Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону — передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.

Верхнеуровневое решение включает в себя три компонента:

  1. REST сервис — задача которого предоставить клиенту возможность передать файл (или запросить).
  2. MQ — отвечает за передачу сообщений между различными сетями.
  3. Application — приложение, отвечающее за хранение файлов и выдачу их по запросу.


image


В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:

  • Получение файла от клиента.
  • Передача полученного файла в MQ.
  • Передача файла из MQ клиенту в качестве ответа.


Метод решения


В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение — максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:

  • Получение файла и сохранение его в MQ очереди должно выполняться в потоковом режиме, без помещения в память полностью файла.
  • В очереди MQ файл будет размещаться в виде набора небольших сообщений.


Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:

image


На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ — каждая порция файла размещается в виде отдельного сообщения.

Разработка REST сервиса


Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:

  • upload — получает от клиента файл и записывает его в MQ очередь, в качестве ответа возвращает идентификатор группы сообщений (в base64 формате).
  • download — получает от клиента идентификатор группы сообщений (в base64 формате) и возвращает файл, хранящийся в MQ очереди.


Метод получения файла от клиента (upload)


В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.

Получение потока входящего файла


Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла

@PUT
@Path("upload")
public Response upload(IMultipartBody body) {
        ...
        IAttachment attachment = body.getAttachment("file");
        InputStream inputStream = attachment.getDataHandler().getInputStream();
        ...
}


Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: WLP_INSTALLATION_PATH>/dev/api/ibm.

Примечание:

  • WLP_INSTALLATION_PATH — путь к директории WebSphere Liberty Profile.
  • Ожидается, что клиент будет передавать файл в параметре с именем «file».
  • Если используется другой сервер приложений, то можно воспользоваться альтернативной библиотекой от Apache CXF.


Потоковое сохранение файла в MQ


Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:

String groupId = RandomStringUtils.randomAscii(24);


Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:

  1. Инициализация объектов подключения к MQ.
  2. Цикличное считывание порции входящего файла пока файл не будет полностью считан:
    1. Порция данных файла записывается в виде отдельного сообщения в MQ.
    2. Каждое сообщение файла имеет свой порядковый номер (свойство «JMSXGroupSeq»).
    3. Все сообщения файла имеет одинаковое значение группы (свойство «JMSXGroupID»).
    4. Последнее сообщение имеет признак, означающий, что это сообщение является завершающим (свойство «JMS_IBM_Last_Msg_In_Group»).
    5. Константа SEGMENT_SIZE содержит размер порции. Например, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException {
        try (
                Connection connection = connectionFactory.createConnection();
                Session session = connection.createSession();
                MessageProducer producer = session.createProducer(session.createQueue(queueName));
        ) {
                byte[] buffer = new byte[SEGMENT_SIZE];
                BytesMessage message = null;
                for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
                        readBytesSize = inputStream.read(buffer);
                        if (message != null) {
                                if (readBytesSize < 1) {
                                        message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
                                }       producer.send(message);
                        }
                        if (readBytesSize > 0) {
                                message = session.createBytesMessage();
                                message.setStringProperty("JMSXGroupID", groupId);
                                message.setIntProperty("JMSXGroupSeq", sequenceNumber);
                                if (readBytesSize == SEGMENT_SIZE) {
                                        message.writeBytes(buffer);
                                } else {
                                        message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
                                }
                        }
                }
        }
}


Метод отправки файла клиенту (download)


Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.

Получение идентификатора группы сообщений


В качестве входящего параметра метод получает идентификатор группы сообщений.

@PUT
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
        ...
}


Потоковая передача ответа клиенту


Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:

public class MQStreamingOutput implements StreamingOutput {

        private String groupId;
        private String queueName;
        
        public MQStreamingOutput(String groupId, String queueName) {
                super();
                this.groupId = groupId;
                this.queueName = queueName;
        }

        @Override
        public void write(OutputStream outputStream) throws IOException, WebApplicationException {
                try {
                        MQWorker().read(outputStream, queueName, groupId);
                } catch(NamingException | JMSException e) {
                        e.printStackTrace();
                        new IOException(e);
                } finally {
                        outputStream.flush();
                        outputStream.close();
                }

        }
}


В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.

Объект этого класса будет передан в качестве параметра для создания ответа клиенту:

@GET
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
        ResponseBuilder responseBuilder = null;
        try {
                MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
                responseBuilder = Response.ok(streamingOutput); 
        } catch(Exception e) {
                e.printStackTrace();
        responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
        }
        return responseBuilder.build();
}


Потоковое считывание файла из MQ


Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:

  1. Инициализация объектов подключения к MQ.
  2. Цикличное считывание сообщений из MQ пока не будет считано сообщение с признаком завершающего в группе (свойство «JMS_IBM_Last_Msg_In_Group»):
    1. Перед каждым считыванием сообщения из очереди устанавливается фильтр (messageSelector), в котором задается идентификатор группы сообщений и порядковый номер сообщения в группе.
    2. Содержимое считанного сообщения записывается в исходящий поток.

public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException {
        try(
                Connection connection = connectionFactory.createConnection();
                Session session = connection.createSession();
        ) {
                connection.start();
                Queue queue = session.createQueue(queueName);
                int sequenceNumber = 1;
                for(boolean isMessageExist = true; isMessageExist == true; ) {
                        String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
                        try(
                                MessageConsumer consumer = session.createConsumer(queue, messageSelector);
                                        ) {
                                BytesMessage message = (BytesMessage) consumer.receiveNoWait();
                                if (message == null) {
                                        isMessageExist = false;
                                } else {
                                        byte[] buffer = new byte[(int) message.getBodyLength()];
                                        message.readBytes(buffer);
                                        outputStream.write(buffer);
                                        if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
                                                isMessageExist = false;
                                        }
                                }
                        }
                }
        }
}


Вызов REST сервиса


Для проверки работы сервиса я воспользуюсь инструментом curl.

Отправка файла

curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload


В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.

Получение файла

curl -X GET http://localhost:9080/Demo/rest/service/download?groupId= -o <путь_к_файлу_куда_запишется_ответ>


Заключение


В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.

Дополнительные материалы


Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка.

Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах — Apache CXF.

Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка.

© Habrahabr.ru