Создание телеграмм-бота (Spring Boot, Kafka, PostgreSQL), часть первая
Младший Java программист
Рецепт по приготовлению своего «Telegram-Франкенштейна»
Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. — «Франкенштейн» Мэри Шелли
Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.
Статьи будут разделены на 2 части, первая часть — создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть — обработка всех логов (Kafka Consumer).
Ингредиенты:
Регистрация бота
Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)
Kafka (для отслеживания топиков я использую Conductor)
PostgreSQL (для комфортной работы я использую DBeaver)
Если возникнут сложности с воссозданием туториала
Прошу пишите в коментариях возникшие проблемы, на всякий случай — вот мой git
Начинаем с нарезки:
Первостепенно нужно настроить build.grable со всеми зависимостями
build.grable
buildscript {
repositories {
mavenCentral()
}
}
plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
apply from: 'build-test.gradle'
group 'com.sercetary.bot'
sourceCompatibility = '14'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
configurations.all {
exclude module: 'slf4j-log4j12'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
implementation 'org.springframework.data:spring-data-commons:2.6.0'
implementation 'org.springframework.kafka:spring-kafka:2.7.6'
implementation 'org.postgresql:postgresql:42.3.1'
implementation 'com.h2database:h2:1.4.200'
implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer
application.yml
server:
port: 9000
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Теперь настройки application.properties
application.properties
# HTTP port for incoming requests
server.port=8081
app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me
# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10
# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me
Хорошо, после настроек нашего проекта, давайте обговорим его структуру:
Структура проекта
Пакеты:
config — описание бинов и конфигурации проекта
controller — обрабатывает запрос пользователя
dto — хранит данные, а так же описывает модель таблицы БД
exceptions — кастомный пакет обработчика ошибок
repository — логика работа с БД
service — основная бизнес логика проекта
Сейчас мы собираем игредиенты и маринуем:
Настройки бинов:
— Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper
AppConfig
@Configuration
public class AppConfig {
@Bean
ObjectMapper customObjectMapper() {
return new ObjectMapper();
}
@Bean
TelegramBotsApi telegramBotsApi() throws TelegramApiException{
return new TelegramBotsApi(DefaultBotSession.class);
}
}
— Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc
DbConfig
@Configuration
public class DbConfig extends DefaultDbConfig {
@Bean
@Qualifier("bot-db")
@ConfigurationProperties(prefix = "app.db.bot-db")
SpringDataJdbcProperties gitlabJdbcProperties() {
return new SpringDataJdbcProperties();
}
@Bean
@Qualifier("bot-db")
public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
return hikariDataSource("db", properties);
}
@Bean
@Qualifier("bot-db")
JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Data
@NoArgsConstructor
public static class SpringDataJdbcProperties {
// constants
private static final String H2_DATABASE_DRIVER = "org.h2.Driver";
/**
* JDBC URL property
*/
String url;
/**
* JDBC driver class name property
*/
String driver;
/**
* JDBC username property
*/
String user;
/**
* JDBC password property
*/
String password;
/**
* Hikari / Vertica maxPoolSize property
*/
String poolSize;
/**
* Minimum pool size
*/
int minPoolSize = 4;
/**
* Maximum pool size
*/
int maxPoolSize = 10;
/**
* This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
* sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
*/
long idleTimeout;
/**
* This property controls the maximum lifetime of a connection in the pool. When a connection
* reaches this timeout, even if recently used, it will be retired from the pool.
* An in-use connection will never be retired, only when it is idle will it be removed
*/
long maxLifetime;
/**
* Bulk insert size
*/
Integer bulkSize;
/**
* All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
*
* @param url JDBC driver class name property
* @param driver JDBC driver class name property
* @param user JDBC username property
* @param password JDBC password property
* @param poolSize Hikari / Vertica maxPoolSize property
* @param bulkSize bulk insert size
*/
public SpringDataJdbcProperties(
String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
this.url = url;
this.driver = driver;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.bulkSize = bulkSize;
}
/**
* Возвращает истину, если экземпляр описывает in-memory H2 database
*
* @return истина, если экземпляр описывает in-memory H2 database
*/
public boolean isH2Database() {
return driver.equals(H2_DATABASE_DRIVER);
}
/**
* Возвращает строковое представление экземпляра объекта в формате JSON
*
* @return строковое представление экземпляра объекта в формате JSON
*/
@Override
public String toString() {
var props = new SpringDataJdbcProperties(
url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
return Json.encode(props);
}
}
}
— Создадим базовый класс для уменьшения дублирования кода инициализации бинов
DefaultDbConfig
@Slf4j
class DefaultDbConfig {
protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
log.info("[{}] настройки БД: [{}]", tag, properties.toString());
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl(properties.getUrl());
ds.setDriverClassName(properties.getDriver());
ds.setUsername(properties.getUser());
ds.setPassword(properties.getPassword());
ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
return ds;
}
}
— После напишем утилитный класс для логирования
Json
public class Json {
static final ObjectMapper mapper = new ObjectMapper();
/**
* Encode instance as JSON
*
* @param obj instance
* @return JSON
*/
public static String encode(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
return obj.toString();
}
}
public static T decode(String json, Class clazz) throws JsonProcessingException {
return mapper.readValue(json, clazz);
}
Далее мы напишем контроллер, для доступа к сервису из вне
— Создаем простенький контроллер, для получения списка записей из БД
UsersController
@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {
private final UserService userService;
/**
* Возвращает список пользователей и связанных с ними планами
*/
@RequestMapping(path = "/users_idea", method = RequestMethod.GET)
public List getIdeaList() {
log.debug("Method - getIdeaList was called");
return userService.getUserList();
}
}
После переходим к созданию модели
— Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице
User
@Data
@RequiredArgsConstructor
public class User {
/**
* user's id
*/
@JsonProperty("id")
private final int id;
/**
* user's name
*/
@JsonProperty("name")
private final String name;
/**
* description
*/
@JsonProperty("description")
private final String description;
private String startWord = "";
@Override
public String toString() { return startWord + description; }
}
UserMapper
@Slf4j
public class UserMapper implements RowMapper {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
var entity = new User(
rs.getInt("id"),
rs.getString("user_name"),
rs.getString("description")
);
log.trace("mapRow(): entity = [{}]", entity);
return entity;
}
}
Переходим к созданию кастомных exception
Для чего они нужны
Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.
— BaseException — класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра — сообщение и тело ошибки
BaseException
@Slf4j
public class BaseException extends RuntimeException{
public BaseException(String msg, Throwable t) {
super(msg, t);
log.error(msg, t);
}
public BaseException(String msg) {
super(msg);
log.error(msg);
}
}
— NotFoundException — класс, который вывзывается, когда ответ не найден, наследуется от BaseException
NotFoundException
@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {
private final static String MESSAGE = "Not Found";
public NotFoundException(Throwable t) {
super(MESSAGE, t);
}
public NotFoundException() {
super(MESSAGE);
}
}
— DbException — класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException
DbException
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {
private static final String MESSAGE = "Ошибка БД";
public DbException(String message) {
super(message);
}
public DbException(Throwable cause) {
super(MESSAGE, cause);
}
}
Теперь для работы с БД, создаем repository
— Создадим интерфейс, который описывает методы, для работы с записями в БД
IUserRepository
public interface IUserRepository {
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
User getById(int id);
/**
* Возвращает список записей
*
* @return список всех записей
* @throws DbException в случае ошибки БД
*/
List getUserList();
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
void insert(User entity);
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
void delete(User entity);
}
— Теперь напишем класс, который реализует методы интерфейса
UserRepository
@Slf4j
@Repository
public class UserRepository implements IUserRepository {
// constants
private static final String SQL_SELECT_BY_NAME = "" +
"SELECT id, user_name, description FROM user_table WHERE id=?";
private static final String SQL_SELECT_LIST = "" +
"SELECT id, user_name, description FROM user_table";
private static final String SQL_INSERT = "" +
"INSERT INTO user_table (user_name, description) VALUES (?, ?)";
private static final String SQL_DELETE = "" +
"DELETE FROM user_table WHERE id = ?";
protected final static UserMapper USER_MAPPER = new UserMapper();
// beans
protected final JdbcTemplate template;
/**
* Req-args constructor for Spring DI
*/
public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
this.template = template;
}
/**
* Возвращает список записей по id
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public User getById(int id) throws DbException {
try {
return DataAccessUtils.singleResult(
template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Возвращает список записей
*
* @return запрашиваемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public List getUserList() throws DbException {
try {
return template.query(SQL_SELECT_LIST, USER_MAPPER);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void insert(User entity) throws DbException {
try {
// В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
var result = template.update(SQL_INSERT,
entity.getName(),
entity.getDescription());
if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
log.info("insert({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
@Override
public void delete(User entity) throws DbException {
try {
var result = template.update(SQL_DELETE, entity.getId());
if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
log.info("delete({}) result={}", entity, result);
} catch (DataAccessException exception) {
throw new DbException(exception);
}
}
}
— Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД
TelegramBot
@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {
private Message requestMessage = new Message();
private final SendMessage response = new SendMessage();
private final Producer producerService;
private final UserService userService;
private final String botUsername;
private final String botToken;
public TelegramBot(
TelegramBotsApi telegramBotsApi,
@Value("${telegram-bot.name}") String botUsername,
@Value("${telegram-bot.token}") String botToken,
Producer producerService, UserService userService) throws TelegramApiException {
this.botUsername = botUsername;
this.botToken = botToken;
this.producerService = producerService;
this.userService = userService;
telegramBotsApi.registerBot(this);
}
/**
* Этот метод вызывается при получении обновлений через метод GetUpdates.
*
* @param request Получено обновление
*/
@SneakyThrows
@Override
public void onUpdateReceived(Update request) {
requestMessage = request.getMessage();
response.setChatId(requestMessage.getChatId().toString());
var entity = new User(
0, requestMessage.getChat().getUserName(),
requestMessage.getText());
if (request.hasMessage() && requestMessage.hasText())
log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());
if (requestMessage.getText().equals("/start"))
defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
else if (requestMessage.getText().equals("/idea"))
onIdea(response);
else
defaultMsg(response, "Я записал вашу мысль :) \n ");
log.info("Working, text[{}]", requestMessage.getText());
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
}
/**
* Метод отправки сообщения со списком мыслей - по команде "/idea"
*
* @param response - метод обработки сообщения
*/
private void onIdea(SendMessage response) throws TelegramApiException {
if (userService.getUserList().isEmpty()) {
defaultMsg(response, "В списке нет мыслей. \n");
} else {
defaultMsg(response, "Вот список ваших мыслей: \n");
for (User txt : userService.getUserList()) {
response.setText(txt.toString());
execute(response);
}
}
}
/**
* Шабонный метод отправки сообщения пользователю
*
* @param response - метод обработки сообщения
* @param msg - сообщение
*/
private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
response.setText(msg);
execute(response);
}
}
Фрагмент кода с отправкой в Kafka и записью в БД
if (requestMessage.getText().startsWith("/")) {
entity.setStartWord("команда: ");
producerService.sendMessage( entity);
} else {
entity.setStartWord("мысль: ");
producerService.sendMessage( entity);
userService.insert(entity);
}
Переходим к созданию бизнес логики приложения
— BaseService — реализует базовые методы сервисов проекта
BaseService
public class BaseService {
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null
*/
public T wrapResult(T result) {
if(result == null)
throw new NotFoundException();
return result;
}
/**
* Обёртка результата
*
* @param result результат
* @return результат
* @throws NotFoundException если результат null или пустой
*/
public List wrapResults(List result) {
if(result == null || result.size() == 0)
throw new NotFoundException();
return result;
}
}
— Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД
UserService
@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {
//beans
protected final IUserRepository repo;
/**
* Возвращает список записей
*
* @return список записей
* @throws DbException в случае ошибки БД
*/
public List getUserList() {
log.trace("#### getUserList() - working");
return wrapResults(repo.getUserList());
}
/**
* Возвращает список записей по id
*
* @throws DbException в случае ошибки БД
*/
public User getById(int id) {
log.trace("#### getById() [id={}]", id);
return wrapResult(repo.getById(id));
}
/**
* Вставка новой записи
*
* @param entity новая запись
* @throws DbException в случае ошибки БД
*/
public void insert(User entity) {
log.trace("#### insert() [entity={}]", entity);
repo.insert(entity);
}
/**
* Удаление записи
*
* @param entity удаляемая запись
* @throws DbException в случае ошибки БД
*/
public void delete(User entity) {
log.trace("#### delete() [entity={}]", entity);
repo.delete(entity);
}
}
— Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет
Producer
@Service
@Slf4j
public class Producer {
private static final String TOPIC = "users";
protected final IUserRepository repo;
@Autowired
private KafkaTemplate kafkaTemplate;
public Producer(IUserRepository repo) {
this.repo = repo;
}
public void sendMessage(User user) {
if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
log.info("#### Producing message [user={}]", user);
kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
}
}
В конце класс, который собственно и запускает все наше приложене
WebHookApp
@Slf4j
@SpringBootApplication
public class WebHookApp {
public static void main(String[] args) {
SpringApplication.run(WebHookApp.class, args);
}
}
Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:
— Сначала проверим, запущена ли Kafka
запуск по команде — sudo su systemctl start kafka
— После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer
Запущенный брокер
— Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:
CREATE TABLE public.log (
id serial4 NOT NULL,
message varchar(500) NOT NULL,
date_time date NOT NULL,
topic varchar(100) NOT NULL,
CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
id serial4 NOT NULL,
user_name varchar(100) NOT NULL,
description varchar(500) NULL,
CONSTRAINT user_table_pkey PRIMARY KEY (id)
);
Схема БД publicВот как выглядят таблица logВот как выглядит таблица user_table
Отлично, блюдо запеклось и готово к подаче:
— Запускаем проект, проверяем, что все настроено и корректно работает
Spring logs
— Открываем телеграмм и пробуем на вкус нашего «Франкенштейна»
Общение с ботом в Телеграмм
— Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?
Логи нашего бота, ошибок не наблюдается
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.2)
2022-01-15 16:46:19.248 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : The following profiles are active: bot
2022-01-15 16:46:19.291 WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887 INFO 412498 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-01-15 16:46:19.887 INFO 412498 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956 INFO 412498 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957 INFO 412498 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013 INFO 412498 --- [ main] c.secretary.bot.config.DefaultDbConfig : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565 INFO 412498 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [ main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [ main] o.s.w.s.handler.SimpleUrlHandlerMapping : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [ main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702 INFO 412498 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709 INFO 412498 --- [ main] com.secretary.bot.WebHookApp : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916 INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-01-15 16:52:33.947 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948 INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056 INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-01-15 16:54:01.188 INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
— Как мы видим, сообщения отправленные Боту появились в БД
Записи в БД
— Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users
Вкладка topics
— Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA
Информация о топике users
— В открывшемся окне, ставим такие же настройки (самая важная из них это Start From — указывает, с какого момента показывать сообщения в Kafka, наша настройка — показывает все сообщения, включая отправленые ранее)
Настройки просмотра сообщений
— Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении
Прилетевшие в Kafka сообщения
Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.