Создание телеграмм-бота (Spring Boot, Kafka, PostgreSQL), часть первая

8c049e961b4edfcde48ca540202c318a.pngИванов Максим

Младший Java программист

Рецепт по приготовлению своего «Telegram-Франкенштейна»

Даже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. - «Франкенштейн» Мэри ШеллиДаже человек средних способностей, упорно занимаясь одним предметом, непременно достигнет в нем глубоких познаний. — «Франкенштейн» Мэри Шелли

Всем привет, данная статья является, своего рода моей первой, но все же постараюсь максимально просто рассказать вам о том, как создать бота, прикрутив к нему все обещанные выше свистелки-тарахтелки.

Статьи будут разделены на 2 части, первая часть — создание основного бота с оправкой логов (Kafka Producer) и записью их в БД, вторая часть — обработка всех логов (Kafka Consumer).

Ингредиенты:

  1. Регистрация бота

  2. Создание Spring Boot проект, проще всего это сделать через встроенный конфигуратор в IntelliJ IDEA, либо используя Spring Initializr. (в качестве системы сборки будет использоваться Gradle)

  3. Kafka (для отслеживания топиков я использую Conductor)

  4. PostgreSQL (для комфортной работы я использую DBeaver)

Если возникнут сложности с воссозданием туториала

Прошу пишите в коментариях возникшие проблемы, на всякий случай — вот мой git

Начинаем с нарезки:

Первостепенно нужно настроить build.grable со всеми зависимостями

build.grable

buildscript {
    repositories {
        mavenCentral()
    }
}

plugins {
    id 'org.springframework.boot' version '2.4.2'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}

apply from: 'build-test.gradle'

group 'com.sercetary.bot'
sourceCompatibility = '14'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

configurations.all {
    exclude module: 'slf4j-log4j12'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web:2.5.6'
    implementation 'org.springframework.boot:spring-boot-starter-jdbc:2.5.6'
    implementation 'org.springframework.data:spring-data-commons:2.6.0'
    implementation 'org.springframework.kafka:spring-kafka:2.7.6'
    implementation 'org.postgresql:postgresql:42.3.1'
    implementation 'com.h2database:h2:1.4.200'

    implementation group: 'org.telegram', name: 'telegrambots-abilities', version: '5.3.0'
    implementation group: 'org.telegram', name: 'telegrambots', version: '5.3.0'

    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.29'
    compileOnly 'org.projectlombok:lombok:1.18.22'
    annotationProcessor 'org.projectlombok:lombok:1.18.22'
}

Далее сразу для работы Kafka опишем application.yml, в котором находятся настройки нашего kafka producer

application.yml

server:
  port: 9000
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Теперь настройки application.properties

application.properties

# HTTP port for incoming requests
server.port=8081

app.http.bot=change-me
telegram-bot.name=change-me
telegram-bot.token=change-me

# Bot db
app.db.bot-db.url=jdbc:postgresql://localhost:5432/change-me
app.db.bot-db.driver=org.postgresql.Driver
app.db.bot-db.user=change-me
app.db.bot-db.password=change-me
app.db.bot-db.pool-size=10

# logging
logging.level.root=INFO
logging.level.org.springframework.web=DEBUG
logging.level.ru.centerinform.webhook=TRACE
logging.file.name=change-me

Хорошо, после настроек нашего проекта, давайте обговорим его структуру:

Структура проектаСтруктура проекта

Пакеты:

  • config — описание бинов и конфигурации проекта

  • controller — обрабатывает запрос пользователя

  • dto — хранит данные, а так же описывает модель таблицы БД

  • exceptions — кастомный пакет обработчика ошибок

  • repository — логика работа с БД

  • service — основная бизнес логика проекта

Сейчас мы собираем игредиенты и маринуем:

Настройки бинов:

— Первым делом прописываем конфигурация бинов нашего приложения в пакете config, тут настройки инициализации TelegramBotsApi и ObjectMapper

AppConfig

@Configuration
public class AppConfig {

    @Bean
    ObjectMapper customObjectMapper() {
        return new ObjectMapper();
    }

    @Bean
    TelegramBotsApi telegramBotsApi() throws TelegramApiException{
        return new TelegramBotsApi(DefaultBotSession.class);
    }
}

— Внутри нашего класса DbConfig, есть класс SpringDataJdbcProperties, который описывает настройки SpringDataJdbc

DbConfig

@Configuration
public class DbConfig extends DefaultDbConfig {

    @Bean
    @Qualifier("bot-db")
    @ConfigurationProperties(prefix = "app.db.bot-db")
    SpringDataJdbcProperties gitlabJdbcProperties() {
        return new SpringDataJdbcProperties();
    }

    @Bean
    @Qualifier("bot-db")
    public DataSource gitlabDataSource(@Qualifier("bot-db") SpringDataJdbcProperties properties) {
        return hikariDataSource("db", properties);
    }

    @Bean
    @Qualifier("bot-db")
    JdbcTemplate gitlabJdbcTemplate(@Qualifier("bot-db") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Data
    @NoArgsConstructor
    public static class SpringDataJdbcProperties {

        // constants
        private static final String H2_DATABASE_DRIVER = "org.h2.Driver";

        /**
         * JDBC URL property
         */
        String url;
        /**
         * JDBC driver class name property
         */
        String driver;
        /**
         * JDBC username property
         */
        String user;
        /**
         * JDBC password property
         */
        String password;
        /**
         * Hikari / Vertica maxPoolSize property
         */
        String poolSize;
        /**
         * Minimum pool size
         */
        int minPoolSize = 4;
        /**
         * Maximum pool size
         */
        int maxPoolSize = 10;
        /**
         * This property controls the maximum amount of time (in milliseconds) that a connection is allowed to
         * sit idle in the pool. A value of 0 means that idle connections are never removed from the pool.
         */
        long idleTimeout;
        /**
         * This property controls the maximum lifetime of a connection in the pool. When a connection
         * reaches this timeout, even if recently used, it will be retired from the pool.
         * An in-use connection will never be retired, only when it is idle will it be removed
         */
        long maxLifetime;
        /**
         * Bulk insert size
         */
        Integer bulkSize;


        /**
         * All-args constructor for {@link SpringDataJdbcProperties#toString()} (logging)
         *
         * @param url JDBC driver class name property
         * @param driver JDBC driver class name property
         * @param user JDBC username property
         * @param password JDBC password property
         * @param poolSize Hikari / Vertica maxPoolSize property
         * @param bulkSize bulk insert size
         */
        public SpringDataJdbcProperties(
                String url, String driver, String user, String password, String poolSize, Integer bulkSize) {
            this.url = url;
            this.driver = driver;
            this.user = user;
            this.password = password;
            this.poolSize = poolSize;
            this.bulkSize = bulkSize;
        }


        /**
         * Возвращает истину, если экземпляр описывает in-memory H2 database
         *
         * @return истина, если экземпляр описывает in-memory H2 database
         */
        public boolean isH2Database() {
            return driver.equals(H2_DATABASE_DRIVER);
        }

        /**
         * Возвращает строковое представление экземпляра объекта в формате JSON
         *
         * @return строковое представление экземпляра объекта в формате JSON
         */
        @Override
        public String toString() {
            var props = new SpringDataJdbcProperties(
                    url, driver, user, ((password == null) || password.isEmpty()) ? "" : "*****", poolSize, bulkSize);
            return Json.encode(props);
        }

    }

}

— Создадим базовый класс для уменьшения дублирования кода инициализации бинов

DefaultDbConfig

@Slf4j
class DefaultDbConfig {

    protected DataSource hikariDataSource(String tag, DbConfig.SpringDataJdbcProperties properties) {
        log.info("[{}] настройки БД: [{}]", tag, properties.toString());

        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl(properties.getUrl());
        ds.setDriverClassName(properties.getDriver());
        ds.setUsername(properties.getUser());
        ds.setPassword(properties.getPassword());
        ds.setMaximumPoolSize(Integer.parseInt(properties.getPoolSize()));
        return ds;
    }
}

— После напишем утилитный класс для логирования

Json

public class Json {
    static final ObjectMapper mapper = new ObjectMapper();

    /**
     * Encode instance as JSON
     *
     * @param obj instance
     * @return JSON
     */
    public static String encode(Object obj) {
        try {
            return mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            return obj.toString();
        }
    }

    public static  T decode(String json, Class clazz) throws JsonProcessingException {
        return mapper.readValue(json, clazz);
    }

Далее мы напишем контроллер, для доступа к сервису из вне

 — Создаем простенький контроллер, для получения списка записей из БД

UsersController

@Slf4j
@RestController
@RequestMapping("${app.http.bot")
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class UsersController {

    private final UserService userService;

    /**
     * Возвращает список пользователей и связанных с ними планами
     */
    @RequestMapping(path = "/users_idea", method = RequestMethod.GET)
    public List getIdeaList() {
        log.debug("Method - getIdeaList was called");
        return userService.getUserList();
    }
}

После переходим к созданию модели

— Создаем модель пользователя User, а так же его маппер UserMapper, который понадобиться для работы с БД и маппинга полей в таблице

User

@Data
@RequiredArgsConstructor
public class User {
    /**
     * user's id
     */
    @JsonProperty("id")
    private final int id;
    /**
     * user's name
     */
    @JsonProperty("name")
    private final String name;
    /**
     * description
     */
    @JsonProperty("description")
    private final String description;

    private String startWord = "";

    @Override
    public String toString() { return startWord + description; }
}

UserMapper

@Slf4j
public class UserMapper implements RowMapper {

    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        var entity = new User(
                rs.getInt("id"),
                rs.getString("user_name"),
                rs.getString("description")
                );
        log.trace("mapRow(): entity = [{}]", entity);
        return entity;
    }
}

Переходим к созданию кастомных exception

Для чего они нужны

Их мы используем для обработки ошибок, которые могут произойти в процессе работы приложения, чтобы бот не сломался и продолжил свою работу.

— BaseException — класс, который наследуется от RuntimeException, в конструкторе принимает 2 параметра — сообщение и тело ошибки

BaseException

@Slf4j
public class BaseException extends RuntimeException{

    public BaseException(String msg, Throwable t) {
        super(msg, t);
        log.error(msg, t);
    }

    public BaseException(String msg) {
        super(msg);
        log.error(msg);
    }

}

— NotFoundException — класс, который вывзывается, когда ответ не найден, наследуется от BaseException

NotFoundException

@ResponseStatus(HttpStatus.NOT_FOUND)
public class NotFoundException extends BaseException {

    private final static String MESSAGE = "Not Found";

    public NotFoundException(Throwable t) {
        super(MESSAGE, t);
    }

    public NotFoundException() {
        super(MESSAGE);
    }
}

— DbException — класс, который обрабатыевает ошибки связанные с БД, наследуется от RuntimeException

DbException

@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public class DbException extends RuntimeException {

    private static final String MESSAGE = "Ошибка БД";

    public DbException(String message) {
        super(message);
    }

    public DbException(Throwable cause) {
        super(MESSAGE, cause);
    }
}

Теперь для работы с БД, создаем repository

— Создадим интерфейс, который описывает методы, для работы с записями в БД

IUserRepository

public interface IUserRepository {

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    User getById(int id);

    /**
     * Возвращает список записей
     *
     * @return список всех записей
     * @throws DbException в случае ошибки БД
     */
    List getUserList();

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    void insert(User entity);

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    void delete(User entity);
}

— Теперь напишем класс, который реализует методы интерфейса

UserRepository

@Slf4j
@Repository
public class UserRepository implements IUserRepository {

    // constants
    private static final String SQL_SELECT_BY_NAME = "" +
            "SELECT id, user_name, description FROM user_table WHERE id=?";
    private static final String SQL_SELECT_LIST = "" +
            "SELECT id, user_name, description FROM user_table";
    private static final String SQL_INSERT = "" +
            "INSERT INTO user_table (user_name, description) VALUES (?, ?)";
    private static final String SQL_DELETE = "" +
            "DELETE FROM user_table WHERE id = ?";

    protected final static UserMapper USER_MAPPER = new UserMapper();

    // beans
    protected final JdbcTemplate template;


    /**
     * Req-args constructor for Spring DI
     */
    public UserRepository(@Qualifier("bot-db") JdbcTemplate template) {
        this.template = template;
    }

    /**
     * Возвращает список записей по id
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public User getById(int id) throws DbException {
        try {
            return DataAccessUtils.singleResult(
                    template.query(SQL_SELECT_BY_NAME, USER_MAPPER, id));
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Возвращает список записей
     *
     * @return запрашиваемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public List getUserList() throws DbException {
        try {
            return template.query(SQL_SELECT_LIST, USER_MAPPER);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void insert(User entity) throws DbException {
        try {
            // В параметры запроса все поля сущности кроме идентификатора, т.к. он serial и генерируется автоматом
            var result = template.update(SQL_INSERT,
                    entity.getName(),
                    entity.getDescription());
            if (result != 1) log.trace("UserRepository.update() with {} rows inserted", entity);
            log.info("insert({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    @Override
    public void delete(User entity) throws DbException {
        try {
            var result = template.update(SQL_DELETE, entity.getId());
            if (result != 1) log.trace("UserRepository.delete() with {} rows inserted", entity);
            log.info("delete({}) result={}", entity, result);
        } catch (DataAccessException exception) {
            throw new DbException(exception);
        }
    }
}

— Далее у нас идет логика бота, тут все тривиально, в отнаследованном onUpdateReceived методе от класса родителя TelegramLongPollingBot мы пишем поведение, которое происходит при обновлении чата с пользователем, подробнее об этом здесь, так же в методе обработки сообщений есть вызов нашего producer и запись данных в БД

TelegramBot

@Slf4j
@Getter
@Component
public class TelegramBot extends TelegramLongPollingBot {

    private Message requestMessage = new Message();
    private final SendMessage response = new SendMessage();
    private final Producer producerService;
    private final UserService userService;

    private final String botUsername;
    private final String botToken;

    public TelegramBot(
            TelegramBotsApi telegramBotsApi,
            @Value("${telegram-bot.name}") String botUsername,
            @Value("${telegram-bot.token}") String botToken,
            Producer producerService, UserService userService) throws TelegramApiException {
        this.botUsername = botUsername;
        this.botToken = botToken;
        this.producerService = producerService;
        this.userService = userService;

        telegramBotsApi.registerBot(this);
    }

    /**
     * Этот метод вызывается при получении обновлений через метод GetUpdates.
     *
     * @param request Получено обновление
     */
    @SneakyThrows
    @Override
    public void onUpdateReceived(Update request) {
        requestMessage = request.getMessage();
        response.setChatId(requestMessage.getChatId().toString());

        var entity = new User(
                0, requestMessage.getChat().getUserName(),
                requestMessage.getText());

        if (request.hasMessage() && requestMessage.hasText())
            log.info("Working onUpdateReceived, request text[{}]", request.getMessage().getText());

        if (requestMessage.getText().equals("/start"))
            defaultMsg(response, "Напишите команду для показа списка мыслей: \n " + "/idea - показать мысли");
        else if (requestMessage.getText().equals("/idea"))
            onIdea(response);
        else
            defaultMsg(response, "Я записал вашу мысль :) \n ");

        log.info("Working, text[{}]", requestMessage.getText());

        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }
    }

    /**
     * Метод отправки сообщения со списком мыслей - по команде "/idea"
     *
     * @param response - метод обработки сообщения
     */
    private void onIdea(SendMessage response) throws TelegramApiException {
        if (userService.getUserList().isEmpty()) {
            defaultMsg(response, "В списке нет мыслей. \n");
        } else {
            defaultMsg(response, "Вот список ваших мыслей: \n");
            for (User txt : userService.getUserList()) {
                response.setText(txt.toString());
                execute(response);
            }
        }
    }

    /**
     * Шабонный метод отправки сообщения пользователю
     *
     * @param response - метод обработки сообщения
     * @param msg - сообщение
     */
    private void defaultMsg(SendMessage response, String msg) throws TelegramApiException {
        response.setText(msg);
        execute(response);
    }
}

Фрагмент кода с отправкой в Kafka и записью в БД

        if (requestMessage.getText().startsWith("/")) {
            entity.setStartWord("команда: ");
            producerService.sendMessage( entity);
        } else {
            entity.setStartWord("мысль: ");
            producerService.sendMessage( entity);
            userService.insert(entity);
        }

Переходим к созданию бизнес логики приложения

— BaseService — реализует базовые методы сервисов проекта

BaseService

public class BaseService {

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null
     */
    public  T wrapResult(T result) {
        if(result == null)
            throw new NotFoundException();
        return result;
    }

    /**
     * Обёртка результата
     *
     * @param result результат
     * @return результат
     * @throws NotFoundException если результат null или пустой
     */
    public  List wrapResults(List result) {
        if(result == null || result.size() == 0)
            throw new NotFoundException();
        return result;
    }

}

— Класс UserService работает с нашим репозиторием IUserRepository и содержит в себе бизнес-логику работы с записями о событиях в БД

UserService

@Service
@Slf4j
@RequiredArgsConstructor
public class UserService extends BaseService {

    //beans
    protected final IUserRepository repo;

    /**
     * Возвращает список записей
     *
     * @return список записей
     * @throws DbException в случае ошибки БД
     */
    public List getUserList() {
        log.trace("#### getUserList() - working");
        return wrapResults(repo.getUserList());
    }

    /**
     * Возвращает список записей по id
     *
     * @throws DbException в случае ошибки БД
     */
    public User getById(int id) {
        log.trace("#### getById() [id={}]", id);
        return wrapResult(repo.getById(id));
    }

    /**
     * Вставка новой записи
     *
     * @param entity новая запись
     * @throws DbException в случае ошибки БД
     */
    public void insert(User entity) {
        log.trace("#### insert() [entity={}]", entity);
        repo.insert(entity);
    }

    /**
     * Удаление записи
     *
     * @param entity удаляемая запись
     * @throws DbException в случае ошибки БД
     */
    public void delete(User entity) {
        log.trace("#### delete() [entity={}]", entity);
        repo.delete(entity);
    }

}

— Класс Producer, как раз тот класс, который шлет сообщения в топик users, а так же здесь мы можем изменять формат самого сообщения и данные, которые он отправляет

Producer

@Service
@Slf4j
public class Producer {

    private static final String TOPIC = "users";
    protected final IUserRepository repo;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public Producer(IUserRepository repo) {
        this.repo = repo;
    }

    public void sendMessage(User user) {
        if (user.getName() == null || user.getDescription().isEmpty()) log.info("#### Empty name/description message");
        log.info("#### Producing message [user={}]", user);
        kafkaTemplate.send(TOPIC, "Writing in log -> " + user);
    }
}

В конце класс, который собственно и запускает все наше приложене

WebHookApp

@Slf4j
@SpringBootApplication
public class WebHookApp {
    public static void main(String[] args) {
        SpringApplication.run(WebHookApp.class, args);
    }
}

Теперь мы замариновали все ингридиенты и подготовили блюдо к запеканию:

— Сначала проверим, запущена ли Kafka

запуск по команде - sudo su systemctl start kafkaзапуск по команде — sudo su systemctl start kafka

— После, запускаем Conductor и видим, что у нас работет брокер сообщений, после запуска нашего приложения, тут появится топик users, в который будут лететь сообщения отправленные нашим producer

Запущенный брокерЗапущенный брокер

— Далее запускаем DBeaver и создаем 2 таблицы (log и user_table), вот схема создания таблиц:

CREATE TABLE public.log (
	id serial4 NOT NULL,
	message varchar(500) NOT NULL,
	date_time date NOT NULL,
	topic varchar(100) NOT NULL,
	CONSTRAINT log_pkey PRIMARY KEY (id)
);
CREATE TABLE public.user_table (
	id serial4 NOT NULL,
	user_name varchar(100) NOT NULL,
	description varchar(500) NULL,
	CONSTRAINT user_table_pkey PRIMARY KEY (id)
);

Схема БД publicСхема БД publicВот как выглядят таблица logВот как выглядят таблица logВот как выглядит таблица user_tableВот как выглядит таблица user_table

Отлично, блюдо запеклось и готово к подаче:

— Запускаем проект, проверяем, что все настроено и корректно работает

Spring logsSpring logs

— Открываем телеграмм и пробуем на вкус нашего «Франкенштейна»

Общение с ботом в ТелеграммОбщение с ботом в Телеграмм

— Давайте посмотрим, что же нам написал Spring в логах и записались ли данные в Kafka и БД ?

Логи нашего бота, ошибок не наблюдается

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.2)
2022-01-15 16:46:19.248  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : The following profiles are active: bot
2022-01-15 16:46:19.291  WARN 412498 --- [kground-preinit] o.s.h.c.j.Jackson2ObjectMapperBuilder    : For Jackson Kotlin classes support please add "com.fasterxml.jackson.module:jackson-module-kotlin" to the classpath
2022-01-15 16:46:19.882  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
2022-01-15 16:46:19.887  INFO 412498 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-01-15 16:46:19.887  INFO 412498 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.41]
2022-01-15 16:46:19.956  INFO 412498 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2022-01-15 16:46:19.957  INFO 412498 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 678 ms
2022-01-15 16:46:20.013  INFO 412498 --- [           main] c.secretary.bot.config.DefaultDbConfig   : [db] настройки БД: [{"url":"jdbc:postgresql://localhost:5432/postgres","driver":"org.postgresql.Driver","user":"*****","password":"*****","poolSize":"10","minPoolSize":4,"maxPoolSize":10,"idleTimeout":0,"maxLifetime":0,"bulkSize":null,"h2Database":false}]
2022-01-15 16:46:20.565  INFO 412498 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2022-01-15 16:46:20.574 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerAdapter : ControllerAdvice beans: 0 @ModelAttribute, 0 @InitBinder, 1 RequestBodyAdvice, 1 ResponseBodyAdvice
2022-01-15 16:46:20.598 DEBUG 412498 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : 3 mappings in 'requestMappingHandlerMapping'
2022-01-15 16:46:20.619 DEBUG 412498 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Patterns [/webjars/**, /**] in 'resourceHandlerMapping'
2022-01-15 16:46:20.627 DEBUG 412498 --- [           main] .m.m.a.ExceptionHandlerExceptionResolver : ControllerAdvice beans: 0 @ExceptionHandler, 1 ResponseBodyAdvice
2022-01-15 16:46:20.702  INFO 412498 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2022-01-15 16:46:20.709  INFO 412498 --- [           main] com.secretary.bot.WebHookApp             : Started WebHookApp in 1.65 seconds (JVM running for 1.962)
SSS2022-01-15 16:52:33.916  INFO 412498 --- [legram Executor] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	internal.auto.downgrade.txn.commit = true
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-01-15 16:52:33.947  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2022-01-15 16:52:33.948  INFO 412498 --- [legram Executor] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1642254753947
2022-01-15 16:52:34.056  INFO 412498 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: faKjxP6CTvGFeeVKJw
2022-01-15 16:54:01.115  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-01-15 16:54:01.188  INFO 412498 --- [legram Executor] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.

— Как мы видим, сообщения отправленные Боту появились в БД

Записи в БДЗаписи в БД

— Открыв кондуктор, перейдите во вкладку topics, после нажимаем на наш топик users

Вкладка topicsВкладка topics

— Далее во вкладке нашего топика нажимаем на кнопку CONSUME DATA

Информация о топике usersИнформация о топике users

— В открывшемся окне, ставим такие же настройки (самая важная из них это Start From — указывает, с какого момента показывать сообщения в Kafka, наша настройка — показывает все сообщения, включая отправленые ранее)

Настройки просмотра сообщенийНастройки просмотра сообщений

— Вот и все, теперь мы убедились, что сообщения благополучно прилетели в Kafka, записались в БД и не вызвали ошибок в приложении

Прилетевшие в Kafka сообщенияПрилетевшие в Kafka сообщения

Ну что же, большое всем спасибо за время, потраченное на прочтение данной статьи, жду вас во второй части этого туториала, где мы используем Consumer Kafka, с помощью которого будем обрабатывать прилетающие сообщения.

© Habrahabr.ru