Создание компонента Apache Camel
Приветствую, сообщество!
Меня зовут Александр, я java разработчик в компании БФТ-Холдинг. Тружусь я на проекте СМЭВ-адаптера, где мы занимаемся транзитивной обработкой сообщений. В нашу зону ответственности входит несколько микросервисов, которые обрабатывают очень много сообщений, почти ничего не пишут в БД, но часто обращаются в сторонние прикладные сервисы.
Для отслеживания пути сообщения через наши микросервисы мы используем Zipkin. Помимо этого в проекте задействован Apache Camel, с помощью которого мы выстраиваем цепочку обработки сообщения в одном конкретном микросервисе. Стандартные средства для работы с Zipkin обычно позволяют легко добавить к трассе вход, выход в сервис и запись в БД, но,
т.к. к нас не совсем стандартное поведение у сервисов, нам хотелось выделять в Zipkin и обращение в сторонние сервисы.
Хотелось эту логику как-то элегантно встроить в роут Camel, но существующие средства такой возможности не предоставляли.
Было принято решение написать свой компонент для Apache Camel. Делали мы это впервые и, к сожалению, полноценного гайда в интернетах найти не удалось…
Встречайте! Гайд по написанию собственного Camel-компонента!
Основные составляющие camel-компонента:
* Component — отвечает за создание Endpointa и является входной точкой в ваш компонент.
* Endpoint — отвечает за создание Producer и Consumer. Также хранит в себе параметры из урла.
* Producer — принимает запросы к вашему компоненту (`to («ref»)`).
* Consumer — отправляет сообщения для слушателей вашего компонента (`from («ref»)`).
Теперь подробнее про каждого.
Component
Класс компонента необходимо аннотировать @Component
и передать имя вашего camel-компонента. Также отнаследовать его от абстрактного класса DefaultComponent
и переопределить метод createEndpoint
. Как можно было догадаться этот метод отвечает за создание Endpoint и важно позаботиться о том, чтобы все необходимые зависимости попали в него, если вы не желаете их получать потом обходными путями. Кроме этого, в этом методе определяются параметры из урла.
@Component("zipkintrace")
public class ZipkinTraceComponent extends DefaultComponent {
// Зависимости
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
public ZipkinTraceComponent(
CamelContext context, ZipkinService zipkinService, ZipkinTraceProperties zipkinTraceProperties,
ZipkinTraceCache zipkinTraceCache
) {
super(context);
this.zipkinService = zipkinService;
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
protected ZipkinTraceEndpoint createEndpoint(
String uri, String remaining, Map parameters
) throws Exception {
ZipkinTraceEndpoint endpoint = new ZipkinTraceEndpoint(
uri, this, zipkinTraceProperties, zipkinTraceCache
);
// Сохранения параметров из урла
setProperties(endpoint, parameters);
endpoint.setAction(remaining);
return endpoint;
}
}
Endpoint
Этот класс поинтереснее. Тут описывается вся необходимая информация для Apache Camel.
Прежде всего аннотируем его @UriEndpoint
. Аннотация принимает множество параметров, описание которых вы найдёте в javaDoc её файла.
Если вы не хотите полностью настраивать Endpoint для сamel, наследуемся от DefaultEndpoint
и имплементируем AsyncEndpoint
, чтобы дать понять фреймворку, что Endpoint поддерживает асинхронную обработку сообщений.
В полях класса определяем все возможные параметры, которые можно передать в урле и помечаем их соответствующими аннотациями.
Важно! У каждого такого поля должен быть геттер и сеттер с описанным JavaDoc для них. Иначе camel-компонент не собрать.
В этом же классе переопределяем методы создания Producer и Consumer
@UriEndpoint(
firstVersion = "3.21.0",
scheme = "zipkintrace",
syntax = "zipkintrace:action",
title = "zipkintrace",
category = Category.LOG,
producerOnly = true,
headersClass = ZipkinTraceConstants.class
)
public class ZipkinTraceEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
@UriPath
@Metadata(required = true)
private String action;
@UriParam
private String route;
@UriParam
private String processor;
@UriParam
private String messageId;
@UriParam
private String originalMessageId;
@UriParam
private String iisId;
@UriParam
private boolean buildTraceContext;
@UriParam(description = "Трасса, которую необходимо продолжить")
private String traceContext;
public ZipkinTraceEndpoint(String endpointUri, Component component,
ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache) {
super(endpointUri, component);
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
public Producer createProducer() throws Exception {
return new ZipkinTraceProduces(this, zipkinTraceProperties, zipkinTraceCache);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
throw new IllegalArgumentException("zipkintraser has no consumer, so you cannot use get any data from him");
}
/**
* Действие относительно трассы zipkin.
* Перечень в ZipkinTraceAction
*/
public String getAction() {
return action;
}
/**
* Действие относительно трассы zipkin.
* Перечень в ZipkinTraceAction
*/
public void setAction(String action) {
this.action = action;
}
// остальные геттеры и сеттеры
}
Producer
Тут всё проще. Как и раньше, если не хотим полностью настраивать Producer, используем стандартный абстрактный класс — DefaultProducer
или DefaultAsyncProducer
. Переопределяем getEndpoint
, чтобы не получать стандартный интерфейс, и метод полезный работы process
. В асинхронном варианте последний метод будет иметь в параметрах callback
для завершения потока.
public class ZipkinTraceProduces extends DefaultAsyncProducer {
private final ZipkinTraceProperties zipkinTraceProperties;
private final ZipkinTraceCache zipkinTraceCache;
public ZipkinTraceProduces(ZipkinTraceEndpoint endpoint,
ZipkinTraceProperties zipkinTraceProperties, ZipkinTraceCache zipkinTraceCache
) {
super(endpoint);
this.zipkinTraceProperties = zipkinTraceProperties;
this.zipkinTraceCache = zipkinTraceCache;
}
@Override
public ZipkinTraceEndpoint getEndpoint() {
return (ZipkinTraceEndpoint) super.getEndpoint();
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isRunAllowed()) {
return shutDownWithException(exchange, callback);
}
try {
// полезная работа
callback.done(true);
return true;
} catch (Throwable e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
private boolean shutDownWithException(Exchange exchange, AsyncCallback callback) {
if (isNull(exchange.getException())) {
exchange.setException(new RejectedExecutionException());
}
callback.done(true);
return true;
}
}
Consumer
В этом классе определяется логика, которая будет отправлять сообщения слушателям. Например, может запускаться слушатель очереди или какая-то крон-задача.
Для этого используем класс DefaultConsumer
в качестве родительского и переопределяем методы doStart
, doStop
.
Если поток сообщений может быть приостановлен (не полное отключение), нужно пометить класс интерфейсом Suspendable
. Методы для обработки этого поведения doSuspend
и doResume
public class ZipkinTraceConsumer extends DefaultConsumer {
public ZipkinTraceConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@Override
protected void doStart() throws Exception {
super.doStart();
}
@Override
protected void doStop() throws Exception {
super.doStop();
}
@Override
protected void doSuspend() {
}
@Override
protected void doResume() throws Exception {
}
}
Теперь неочевидное
К сожалению, чтобы фреймворк заметил ваш компонент и позволил его использовать, действий, описанных выше, недостаточно. Помимо всего этого нужно добавить плагин, который сгенерирует метаинформацию по вашему компоненту во время компиляции кода. И вот тогда camel признает все ваши труды.
org.apache.camel
camel-component-maven-plugin
${camel-version}
generate
generate
process-classes
И создать файл мета информации в пакете
resources/META-INF/services/org/apache/camel/component/
Файл назвать по имени компонента.
Содержимое файла
class=ru.gov.pfr.ecp.iis.smev.adapter.zipkin.camel.component.ZipkinTraceComponent
Что получилось у нас
from("direct:" + FSSP_REPORT_ARREST_PROCESSING_ROUTE)
.routeId(FSSP_REPORT_ARREST_PROCESSING_ROUTE)
.log(LoggingLevel.INFO, FSSP_REPORT_ARREST_PROCESSING_ROUTE + ".start")
.to("zipkintrace:scoped?processor=ReportArrestXmlEACreateProcessor") <-- Обращение в условный S3
.process(reportArrestXmlEACreateProcessor)
.to("zipkintrace:scoped?processor=ReportArrestXmlSignProcessor") <-- Обращение в прикладной сервис
.process(reportArrestXmlSignProcessor)
.to("zipkintrace:scoped?processor=ReportArrestArchiveCreateProcessor") <-- Сохранение результата в S3
.process(reportArrestArchiveCreateProcessor)
.to("zipkintrace:end")
.process(convertProcessor);
Надеюсь, эта статья будет полезна и убережёт вас от подводных каменей Apache Camel.
Полезные материалы
Код примера
Документация фреймворка
Другие компоненты в открытом доступе
Stackoverflow