[Из песочницы] Потоковая передача данных из REST сервиса в MQ очередь
Привет, Хабр!
В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.
Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).
Постановка задачи
Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону — передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.
Верхнеуровневое решение включает в себя три компонента:
- REST сервис — задача которого предоставить клиенту возможность передать файл (или запросить).
- MQ — отвечает за передачу сообщений между различными сетями.
- Application — приложение, отвечающее за хранение файлов и выдачу их по запросу.
В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:
- Получение файла от клиента.
- Передача полученного файла в MQ.
- Передача файла из MQ клиенту в качестве ответа.
Метод решения
В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение — максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:
- Получение файла и сохранение его в MQ очереди должно выполняться в потоковом режиме, без помещения в память полностью файла.
- В очереди MQ файл будет размещаться в виде набора небольших сообщений.
Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:
На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ — каждая порция файла размещается в виде отдельного сообщения.
Разработка REST сервиса
Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:
- upload — получает от клиента файл и записывает его в MQ очередь, в качестве ответа возвращает идентификатор группы сообщений (в base64 формате).
- download — получает от клиента идентификатор группы сообщений (в base64 формате) и возвращает файл, хранящийся в MQ очереди.
Метод получения файла от клиента (upload)
В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.
Получение потока входящего файла
Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла
@PUT
@Path("upload")
public Response upload(IMultipartBody body) {
...
IAttachment attachment = body.getAttachment("file");
InputStream inputStream = attachment.getDataHandler().getInputStream();
...
}
Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: WLP_INSTALLATION_PATH>/dev/api/ibm.
Примечание:
- WLP_INSTALLATION_PATH — путь к директории WebSphere Liberty Profile.
- Ожидается, что клиент будет передавать файл в параметре с именем «file».
- Если используется другой сервер приложений, то можно воспользоваться альтернативной библиотекой от Apache CXF.
Потоковое сохранение файла в MQ
Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:
String groupId = RandomStringUtils.randomAscii(24);
Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание порции входящего файла пока файл не будет полностью считан:
- Порция данных файла записывается в виде отдельного сообщения в MQ.
- Каждое сообщение файла имеет свой порядковый номер (свойство «JMSXGroupSeq»).
- Все сообщения файла имеет одинаковое значение группы (свойство «JMSXGroupID»).
- Последнее сообщение имеет признак, означающий, что это сообщение является завершающим (свойство «JMS_IBM_Last_Msg_In_Group»).
- Константа SEGMENT_SIZE содержит размер порции. Например, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException {
try (
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(queueName));
) {
byte[] buffer = new byte[SEGMENT_SIZE];
BytesMessage message = null;
for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
readBytesSize = inputStream.read(buffer);
if (message != null) {
if (readBytesSize < 1) {
message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
} producer.send(message);
}
if (readBytesSize > 0) {
message = session.createBytesMessage();
message.setStringProperty("JMSXGroupID", groupId);
message.setIntProperty("JMSXGroupSeq", sequenceNumber);
if (readBytesSize == SEGMENT_SIZE) {
message.writeBytes(buffer);
} else {
message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
}
}
}
}
}
Метод отправки файла клиенту (download)
Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.
Получение идентификатора группы сообщений
В качестве входящего параметра метод получает идентификатор группы сообщений.
@PUT
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
...
}
Потоковая передача ответа клиенту
Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:
public class MQStreamingOutput implements StreamingOutput {
private String groupId;
private String queueName;
public MQStreamingOutput(String groupId, String queueName) {
super();
this.groupId = groupId;
this.queueName = queueName;
}
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
try {
MQWorker().read(outputStream, queueName, groupId);
} catch(NamingException | JMSException e) {
e.printStackTrace();
new IOException(e);
} finally {
outputStream.flush();
outputStream.close();
}
}
}
В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.
Объект этого класса будет передан в качестве параметра для создания ответа клиенту:
@GET
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
ResponseBuilder responseBuilder = null;
try {
MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
responseBuilder = Response.ok(streamingOutput);
} catch(Exception e) {
e.printStackTrace();
responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
}
return responseBuilder.build();
}
Потоковое считывание файла из MQ
Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание сообщений из MQ пока не будет считано сообщение с признаком завершающего в группе (свойство «JMS_IBM_Last_Msg_In_Group»):
- Перед каждым считыванием сообщения из очереди устанавливается фильтр (messageSelector), в котором задается идентификатор группы сообщений и порядковый номер сообщения в группе.
- Содержимое считанного сообщения записывается в исходящий поток.
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException {
try(
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
) {
connection.start();
Queue queue = session.createQueue(queueName);
int sequenceNumber = 1;
for(boolean isMessageExist = true; isMessageExist == true; ) {
String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
try(
MessageConsumer consumer = session.createConsumer(queue, messageSelector);
) {
BytesMessage message = (BytesMessage) consumer.receiveNoWait();
if (message == null) {
isMessageExist = false;
} else {
byte[] buffer = new byte[(int) message.getBodyLength()];
message.readBytes(buffer);
outputStream.write(buffer);
if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
isMessageExist = false;
}
}
}
}
}
}
Вызов REST сервиса
Для проверки работы сервиса я воспользуюсь инструментом curl.
Отправка файла
curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload
В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.
Получение файла
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId= -o <путь_к_файлу_куда_запишется_ответ>
Заключение
В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.
Дополнительные материалы
Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка.
Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах — Apache CXF.
Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка.