Реализация RPC во внутреннем взаимодействии модулей с Spring Boot
В какой-то момент я настолько устал настраивать все эти мелочи для REST-взаимодействия в микросервисах, что реализовал механизм, который сильно упрощает эту работу.
И я уверен, что вам стоит прочитать эту статью, если:
Вы используете Java и Spring Boot в своей работе.
Ваш проект строится на микросервисной архитектуре.
Вас раздражает бесконечное описание REST-контрактов и их поддержку.
А если у вас остались шрамы после попыток внедрения gRPC, то тем более рекомендую дочитать до конца.
В статье мы рассмотрим альтернативный подход к взаимодействию между сервисами — легковесное RPC-решение, которое удобно использовать внутри одного продукта. Оно не претендует на замену публичных API, но значительно упрощает жизнь внутри команды, где «правила игры» можно обсудить за кружкой пива.
Зачем это нужно?
Такой подход позволяет:
Упростить код, избавившись от ручного создания HTTP-запросов и парсинга ответов.
Ускорить разработку, так как взаимодействие между сервисами становится похожим на работу с локальными интерфейсами.
Сделать код более читаемым и поддерживаемым.
Итак, давайте начнем.
Что такое RPC?
RPC (Remote Procedure Call) — это механизм, позволяющий вызывать методы или функции на удаленном сервере так, как если бы они выполнялись локально. Вместо того чтобы отправлять HTTP-запросы и парсить ответы, разработчик просто вызывает метод интерфейса, а вся сложность взаимодействия с удаленным сервисом скрыта под капотом. Это упрощает код и ускоряет разработку, особенно в микросервисной архитектуре, где сервисы активно взаимодействуют друг с другом.
Что нам нужно сделать?
Поддержка клиент-серверного взаимодействия.
Использование HTTP в качестве транспорта.
Передача сериализованных объектов в теле запросов и ответов.
Настройка автоконфигураций для Spring Boot для удобства использования.
Реализация RPC клиента
Первым шагом я создал аннотацию-маркер @ModuRpcApi
, которой мы будем помечать все интерфейсы участвующие в RPC взаимодействии:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface ModuRpcApi {
}
«Название ModuRpc
— это сокращение от «Module RPC», предложенное ChatGPT.»
Затем я добавил интерфейс для тестирования:
@ModuRpcApi
public interface ClientApi {
String ping();
}
Пример, для того чтобы показать, что этот интерфейс планируется использовать как обычный спринговый бин:
@Component
@Slf4j
public class TestRunner implements ApplicationRunner {
private final ClientApi clientApi;
@Autowired
public TestRunner(ClientApi clientApi) {
this.clientApi = clientApi;
}
@Override
public void run(ApplicationArguments args) throws Exception {
String result = clientApi.ping();
log.info(result);
}
}
На данном этапе ничего не работает — и это логично, так как нет реализации ClientApi
. Нам нужен механизм, который создаст динамический прокси и обеспечит вызовы RPC.
Реализация RPC-клиента
Spring AOP и MethodInterceptor
помогут нам перехватывать вызовы интерфейсов и реализовывать нашу затею.
public interface MethodInterceptor extends Callback
{
/**
* All generated proxied methods call this method instead of the original method.
* The original method may either be invoked by normal reflection using the Method object,
* or by using the MethodProxy (faster).
* @param obj "this", the enhanced object
* @param method intercepted Method
* @param args argument array; primitive types are wrapped
* @param proxy used to invoke super (non-intercepted method); may be called
* as many times as needed
* @throws Throwable any exception may be thrown; if so, super method will not be invoked
* @return any value compatible with the signature of the proxied method. Method returning void will ignore this value.
* @see MethodProxy
*/
public Object intercept(Object obj, java.lang.reflect.Method method, Object[] args,
MethodProxy proxy) throws Throwable;
}
На основе этого интерфейса создаем класс перехватчика:
@Component
public class ModuRpcClientMethodInterceptor implements MethodInterceptor {
private final HttpClient httpClient;
private final ModuRpcSerializer javaSerializer;
private final ConcurrentMap, String> apiUrls = new ConcurrentHashMap<>();
@Autowired
public ModuRpcClientMethodInterceptor(HttpClient httpClient, ModuRpcSerializer javaSerializer) {
this.httpClient = httpClient;
this.javaSerializer = javaSerializer;
}
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
if (isObjectMethod(method)) {
return handleObjectMethod(obj, method, args);
}
String apiUrl = apiUrls.get(method.getDeclaringClass());
if (apiUrl == null) {
throw new IllegalArgumentException("No URL configured for API [%s]".formatted(method.getDeclaringClass().getName()));
}
String url = "%s/rpc/%s/%s".formatted(apiUrl, method.getDeclaringClass().getName(), method.getName());
ModuRpcRequest moduRpcRequest = new ModuRpcRequest(args, method.getParameterTypes());
try {
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(new URI(url))
.header("Content-Type", "application/octet-stream")
.method("POST", HttpRequest.BodyPublishers.ofByteArray(javaSerializer.serialize(moduRpcRequest)));
HttpResponse response = httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
if (response.statusCode() != 200) {
throw new ModuRpcTransportException("RPC request failed with status: " + response.statusCode());
}
ModuRpcResponse moduRpcResponse = javaSerializer.deserialize(response.body(), ModuRpcResponse.class);
if (moduRpcResponse == null) {
throw new ModuRpcTransportException("RPC response is null");
}
if (moduRpcResponse.getResult() instanceof Throwable) {
throw (Throwable) moduRpcResponse.getResult();
}
if (method.getReturnType().isInstance(moduRpcResponse.getResult())) {
return method.getReturnType().cast(moduRpcResponse.getResult());
}
return moduRpcResponse.getResult();
} catch (URISyntaxException | IOException | InterruptedException e) {
throw new ModuRpcTransportException("RPC transport error: " + e.getMessage(), e);
}
}
public T createProxy(Class clazz, String url) {
apiUrls.put(clazz, url);
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(clazz);
enhancer.setCallback(this);
return (T) enhancer.create();
}
private boolean isObjectMethod(Method method) {
return method.getDeclaringClass() == Object.class;
}
private Object handleObjectMethod(Object obj, Method method, Object[] args) {
return switch (method.getName()) {
case "hashCode" -> System.identityHashCode(obj);
case "equals" -> obj == args[0];
case "toString" -> obj.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(obj));
default -> throw new IllegalArgumentException("Unexpected method: " + method);
};
}
}
Комментарии к коду:
ConcurrentMap
хранит URL-адреса обработчиков для интерфейсов., String> apiUrls Метод
intercept
перехватывает все запросы, включаяhashCode
иequals
, которые нужно обрабатывать отдельно.Метод
createProxy
создает прокси-класс для RPC клиента.
Сериализация объектов
Для сериализации\десериализации решил использовать библиотеку Kryo.
@Component
public class KryoModuRpcSerializer implements ModuRpcSerializer {
private static final Logger logger = LoggerFactory.getLogger(KryoModuRpcSerializer.class);
private final Kryo kryo;
public KryoModuRpcSerializer(Kryo kryo) {
this.kryo = kryo;
}
@Override
public byte[] serialize(Object object) {
long startTime = System.nanoTime();
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
kryo.writeObject(output, object);
output.close();
return byteArrayOutputStream.toByteArray();
} catch (Exception e) {
throw new ModuRpcTransportException("Failed to serialize object", e);
} finally {
long endTime = System.nanoTime();
long durationMicros = TimeUnit.NANOSECONDS.toMicros(endTime - startTime);
logger.debug("Serialization time: {} microseconds", durationMicros);
}
}
@Override
public T deserialize(byte[] data, Class type) {
long startTime = System.nanoTime();
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
Input input = new Input(byteArrayInputStream)) {
return kryo.readObject(input, type);
} catch (Exception e) {
throw new ModuRpcTransportException("Failed to deserialize object", e);
} finally {
long endTime = System.nanoTime();
long durationMicros = TimeUnit.NANOSECONDS.toMicros(endTime - startTime);
logger.debug("Deserialization time: {} microseconds", durationMicros);
}
}
}
Важно настроить Kryo на работу с авторегистрацией объектов, так как если этого не сделать, то получим ошибку сериализации\десериализации:
@Bean
public Kryo kryo() {
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.setRegistrationRequired(false);
kryo.register(UUID.class, new DefaultSerializers.UUIDSerializer());
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
kryo.register(Throwable.class, new JavaSerializer());
return kryo;
}
На текущий момент можно добавить бин в контекст спринга:
@Bean
public ClientApi clientApi(ModuRpcClientMethodInterceptor interceptor) {
return interceptor.createProxy(ClientApi.class, "http://localhost:8080");
}
На этом с клиентом всё. При вызове любого объявленного метода, будет вызов в сторону RPC-сервера по http.
Реализация RPC-сервера
Для реализации RPC-сервера достаточно будет использовать стандартный RestController. Нужно вести список обработчиков, который может принять сервер. Сериализатор же будем использовать, который описывали ранее.
@RestController
@RequestMapping("/rpc")
public class ModuRpcServer {
private final HandlerRegistry handlerRegistry;
private final ModuRpcSerializer javaSerializer;
private static final Logger logger = LoggerFactory.getLogger(ModuRpcServer.class);
@Autowired
public ModuRpcServer(HandlerRegistry handlerRegistry, ModuRpcSerializer javaSerializer) {
this.handlerRegistry = handlerRegistry;
this.javaSerializer = javaSerializer;
}
@PostMapping("/{className}/{methodName}")
public byte[] handleRpcRequest(@PathVariable String className, @PathVariable String methodName, @RequestBody byte[] requestBody) throws Throwable {
logger.debug("call {}.{} with body length [{}]", className, methodName, requestBody.length);
try {
Object handler = handlerRegistry.getHandler(className);
if (handler == null) {
throw new IllegalArgumentException("No handler found for api [%s]".formatted(className));
}
ModuRpcRequest moduRpcRequest = javaSerializer.deserialize(requestBody, ModuRpcRequest.class);
Method method = findMethod(handler.getClass(), methodName, moduRpcRequest.getParameterTypes());
if (method == null) {
throw new IllegalArgumentException("No method found [%s] in class [%s]".formatted(methodName, className));
}
Object result = method.invoke(handler, moduRpcRequest.getParameters());
return javaSerializer.serialize(new ModuRpcResponse(result));
} catch (InvocationTargetException e) {
return javaSerializer.serialize(new ModuRpcResponse(e.getCause()));
} catch (Throwable e) {
return javaSerializer.serialize(new ModuRpcResponse(e));
}
}
private Method findMethod(Class handlerClass, String methodName, Class[] parameterTypes) {
try {
return handlerClass.getMethod(methodName, parameterTypes);
} catch (NoSuchMethodException e) {
return null;
}
}
}
Основная сложность — правильно определить, какой метод класса вызывается. Для этого RPC-клиент передает информацию с типами вызываемых параметров, что позволяет однозначно определить вызываемый метод.
Ниже представлен класс для ведения списка обработчиков:
public class HandlerRegistry {
private final Map handlers = new ConcurrentHashMap<>();
public void registerHandler(Class clazz, Object handler) {
handlers.put(clazz.getName(), handler);
}
public Object getHandler(String className) {
return handlers.get(className);
}
}
Осталось только вручную описать конфигурацию:
@Configuration
public class ModuRpcServerConf {
private final HandlerRegistry handlerRegistry;
private final ClientApiHandler clientApiHandler;
public ModuRpcServerConf(HandlerRegistry handlerRegistry, ClientApiHandler clientApiHandler) {
this.handlerRegistry = handlerRegistry;
this.clientApiHandler = clientApiHandler;
}
@PostConstruct
public void registerHandlers() {
handlerRegistry.registerHandler(ClientApi.class, clientApiHandler);
}
}
На текущий момент мы имеем работающий клиент и сервер в RPC взаимодействии.
Реализация автоконфигураций
Самое время перейти к заключительной части, а именно к реализации автоконфигураций (по сути начинаем делать спринговый стартер), чтобы можно было удобно пользоваться результатами наших трудов.
Клиент
С клиентом придется повозиться.
Во-первых, необходимо определить host, на который будут отправляться RPC-запросы. Информацию об этом можно хранить в application.properties
.
Я решил использовать следующую структуру:
rpc.clients.com.example.client.api.url=http://localhost:8080
rpc.clients.com.example.payment.api.url=http://localhost:9090
...
где rpc.clients.[].url
— постоянная часть, а []
— пакет в котором необходимо искать API для клиента.
Во-вторых, наше приложение может работать одновременно в двух режимах, клиентом и сервером.
Например, если у нас следующая зависимость между приложениями: сервис1
→ сервис2
→ сервис3
. То сервис2
будет сервером для сервис1
и при этом будет клиентом для сервис3
. А это значит, что надо пропустить все интерфейсы, которые уже имеют реализацию.
public class ModuRpcClientBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(ModuRpcClientBeanDefinitionRegistryPostProcessor.class);
private final Environment environment;
private ApplicationContext applicationContext;
public ModuRpcClientBeanDefinitionRegistryPostProcessor(Environment environment) {
this.environment = environment;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
if (applicationContext != null) {
// Получаем доступ к свойствам rpc.clients
Map urlMap = getPropertiesByPattern("rpc.clients", ".url");
String basePackage = getBasePackage(applicationContext);
List extraPackages = getExtraScanPackages();
List scanPackages = new ArrayList<>();
if (basePackage != null) {
scanPackages.add(basePackage);
}
scanPackages.addAll(extraPackages);
logger.info("Scanning RPC clients in packages: {}", scanPackages);
Reflections reflections = new Reflections(new ConfigurationBuilder()
.forPackages(scanPackages.toArray(String[]::new))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
Set> rpcApiClasses = reflections.getTypesAnnotatedWith(ModuRpcApi.class);
Set> rpcApiInterfaces = rpcApiClasses.stream()
.filter(Class::isInterface)
.collect(Collectors.toSet());
ModuRpcClientMethodInterceptor interceptor = applicationContext.getBean(ModuRpcClientMethodInterceptor.class);
for (Class apiInterface : rpcApiInterfaces) {
if (hasImplementation(registry, apiInterface)) {
continue;
}
if (!urlMap.containsKey(apiInterface.getPackageName())) {
throw new IllegalArgumentException("Url for interface [%s] not found. Please set [rpc.clients.%s.url] in application.properties"
.formatted(apiInterface.getName(), apiInterface.getPackageName()));
}
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(ModuRpcClientFactoryBean.class);
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(apiInterface);
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(urlMap.get(apiInterface.getPackageName()));
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(interceptor);
// Помечаем информацией, что это сгенерированный бин
beanDefinition.setAttribute("rpc.client", true);
registry.registerBeanDefinition(apiInterface.getSimpleName(), beanDefinition);
logger.info("registered rpc client for [{}]", apiInterface.getName());
}
}
}
// Определение пакета от класса аннотированный SpringBootApplication
private String getBasePackage(ApplicationContext context) {
if (context != null) {
for (String beanName : context.getBeanDefinitionNames()) {
Class beanClass = context.getType(beanName);
if (beanClass != null && beanClass.isAnnotationPresent(SpringBootApplication.class)) {
return beanClass.getPackageName();
}
}
}
logger.warn("Cannot determine base package. Please specify 'rpc.scan.packages' manually.");
return null;
}
// Пользователь может указать дополнительные пакеты для сканирования
private List getExtraScanPackages() {
String property = environment.getProperty("rpc.scan.packages");
if (property == null) {
return List.of();
}
return Arrays.stream(property.split(","))
.map(String::trim)
.filter(pkg -> !pkg.isEmpty())
.distinct()
.toList();
}
// Добываем из переменных окружений URL назначения запроса
private Map getPropertiesByPattern(String propertyBaseName, String suffix) {
Binder binder = Binder.get(environment);
return binder.bind(propertyBaseName, Bindable.mapOf(String.class, String.class))
.orElse(Collections.emptyMap())
.entrySet()
.stream()
.filter(entry -> entry.getKey().endsWith(suffix))
.collect(Collectors.toMap(
entry -> entry.getKey().substring(0, entry.getKey().length() - suffix.length()),
Map.Entry::getValue
));
}
private boolean hasImplementation(BeanDefinitionRegistry registry, Class apiInterface) {
String[] beanNames = ((ConfigurableListableBeanFactory) registry).getBeanNamesForType(apiInterface);
return beanNames.length > 0;
}
}
Комментарии по коду:
Мы реализуем
BeanDefinitionRegistryPostProcessor
чтобы зарегистрировать бин для нашего клиента.Так как в этот момент никаких бинов еще нет, то нам надо вручную сходить в
Environment
за необходимыми параметрами.Чтобы найти наши аннотированные интерфейсы, необходимо поискать. Логично! Сложность в том, что не понятно где их искать. Я решил по умолчанию сканировать от класса с аннотацией SpringBootApplication. И при этом дать возможность ручного уточнения где искать через проперти
rpc.scan.packages
.Из найденных интерфейсов, надо пропустить все интерфейсы, которые уже имеют реализацию.
Так же важно пометить BeanDefinition, что это нами сгенерированный бин. Пригодится позже.
Но этого всё ещё недостаточно. Мы зарегистрировали бины, но они пока без реализации. Поэтому добавим свою фабрику бин!
public class ModuRpcClientFactoryBean implements FactoryBean {
private final Class apiInterface;
private final String url;
private final ModuRpcClientMethodInterceptor interceptor;
public ModuRpcClientFactoryBean(Class apiInterface, String url, ModuRpcClientMethodInterceptor interceptor) {
this.apiInterface = apiInterface;
this.url = url;
this.interceptor = interceptor;
}
@Override
public T getObject() {
return interceptor.createProxy(apiInterface, url);
}
@Override
public Class getObjectType() {
return apiInterface;
}
}
Сервер
Тут, как завещал Евгений Борисов, сразу можно начать с реализации BeanPostProcessor. Сначала я реализовал поиск всех интерфейсов, которые реализует бин, и если найденный интерфейс аннотирован нашим маркером, то это обработчик RPC-сервера и его надо зарегистрировать в нашем реестре.
public class ModuRpcServerBeanPostProcessor implements BeanPostProcessor {
private static final Logger logger = LoggerFactory.getLogger(ModuRpcServerBeanPostProcessor.class);
private final HandlerRegistry handlerRegistry;
public ModuRpcServerBeanPostProcessor(HandlerRegistry handlerRegistry) {
this.handlerRegistry = handlerRegistry;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class beanClass = bean.getClass();
for (Class iface : beanClass.getInterfaces()) {
if (iface.isAnnotationPresent(ModuRpcApi.class)) {
handlerRegistry.registerHandler(iface, bean);
logger.info("Registered RPC handler: {}", iface.getName());
}
}
return bean;
}
}
Но внимательный читатель заметит, что RPC-клиенты тоже уже будут иметь реализацию. А значит такие бины необходимо пропустить и не регистрировать как серверный обработчик. Таким образом улучшенная реализация выглядит следующим образом:
public class ModuRpcServerBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(ModuRpcServerBeanPostProcessor.class);
private final HandlerRegistry handlerRegistry;
private ApplicationContext applicationContext;
public ModuRpcServerBeanPostProcessor(HandlerRegistry handlerRegistry) {
this.handlerRegistry = handlerRegistry;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof FactoryBean) {
return bean; // Пропускаем фабрики, так как они создают прокси
}
ConfigurableApplicationContext ctx = (ConfigurableApplicationContext) applicationContext;
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) ctx.getBeanFactory();
// Пропускаем сгенерированные клиенты для RPC
if (registry.containsBeanDefinition(beanName)) {
BeanDefinition beanDefinition = registry.getBeanDefinition(beanName);
Object isRpcClient = beanDefinition.getAttribute("rpc.client");
if (Boolean.TRUE.equals(isRpcClient)) {
return bean;
}
}
// Регистрируем обработчики для сервера
Class beanClass = bean.getClass();
for (Class iface : beanClass.getInterfaces()) {
if (iface.isAnnotationPresent(ModuRpcApi.class)) {
handlerRegistry.registerHandler(iface, bean);
logger.info("Registered RPC handler: {}", iface.getName());
}
}
return bean;
}
}
Самую сложную часть мы преодолели. Осталось описать конфигурацию, но этого в статье не будет (можно найти в исходниках на github). И использовать maven зависимость в интересующих нас приложениях.
Заключение
В этой статье я показал, как реализовал легковесное RPC-взаимодействие между модулями или микросервисами с использованием Spring Boot 3.
Еще раз пропишу ключевые этапы реализации:
Создание аннотации-маркера для RPC API.
Реализацию динамического прокси для перехвата вызовов методов.
Сериализацию объектов с использованием библиотеки Kryo.
Настройку RPC сервера на основе Spring RestController.
Автоконфигурацию для удобного подключения RPC клиента и сервера в Spring Boot приложениях.
Основная цель — упростить взаимодействие между сервисами, сделав его более интуитивным и похожим на вызов локальных методов. Такой подход особенно полезен в микросервисной архитектуре, где сервисы активно обмениваются данными.
Конечно, предложенное решение не претендует на идеальность и может быть доработано. Например, можно добавить поддержку других протоколов, улучшить обработку ошибок или добавить метрики для мониторинга вызовов. Однако даже в текущем виде этот подход может стать полезным инструментом в арсенале разработчика, особенно при работе с большим количеством взаимодействующих сервисов.
Надеюсь, статья окажется полезной и вдохновит вас на эксперименты с RPC и Spring Boot. Если у вас есть вопросы или предложения по улучшению, не стесняйтесь делиться ими в комментариях. Удачи в ваших проектах!
Исходники доступны на github.