Рыночные данные в кармане: как получать их быстро и просто с помощью Java/Kotlin и Spring Boot + TinkoffApi

8540def7373433b7a1730d1e139cb103

Если вы хотите написать проект, связанный с рыночными данными или торговлей на бирже, и вы знакомы с Java или Kotlin и слышали про Spring Boot, то эта статья для вас.

Я являюсь автором Spring Boot стартера с помощью которого можно легко интегрировать TinkoffInvestApi в свои Spring Boot приложения. В стартере упор сделан на стриминг различных рыночных данных — вам остается написать только логику их обработки.

В этой статье я хочу лишь познакомить вас с моим стартером, показать идеи и часть возможностей. Без деталей реализации, они будут в серии следующих статей.

Пререквезиты

  • Нужно быть клиентом Тинькофф, так как мы будем использовать токен Тинькофф Инвестиций

  • jdk17+

  • SpringBoot 3.0+

Для тех кто не знаком с TinkoffInvestApi — можете ознакомиться на их страничке в GitHub.
Если коротко: Tinkoff Invest API — это REST/gRPC интерфейс для взаимодействия с торговой платформой «Тинькофф Инвестиции».

Какие задачи можно решать:

  • Анализ котировок бумаг

  • Сигналы на покупку или продажу (Например оповещения о входе в позицию в Telegram)

  • Прогнозы движения акций

  • Анализ портфеля

  • Автоматизация торговли и создание торговых роботов

  • Ведение собственной системы статистики

  • Тестирование стратегий на истории

Какую проблему я вижу в использовании Tinkoff Invest API напрямую?

  1. Сложность написания кода + много boilerplate

  2. Как следствие высокий порог входа

  3. Будут появлятся одинаковые решения однотипных проблем, эти решения лучше вынести и поделится с сообществом

Теперь давайте попробуем решить следующую проблему

Я хочу отправлять notification в telegram когда доллар на бирже приближается к 100 рублям за штуку. Если я использую Tinkoff Invest API мне для этого придеться, помимо написания самой логики отправки notification, написать кучу всего большого и страшного, что никак не связано с моей задачей. Такой пример приводят сами разработчики по получению рыночных данных

Пример на java

Взял его отсюда

private static void marketdataStreamExample(InvestApi api) {
  var randomFigi = randomFigi(api, 5);

  //Описываем, что делать с приходящими в стриме данными
  StreamProcessor processor = response -> {
    if (response.hasTradingStatus()) {
      log.info("Новые данные по статусам: {}", response);
    } else if (response.hasPing()) {
      log.info("пинг сообщение");
    } else if (response.hasCandle()) {
      log.info("Новые данные по свечам: {}", response);
    } else if (response.hasOrderbook()) {
      log.info("Новые данные по стакану: {}", response);
    } else if (response.hasTrade()) {
      log.info("Новые данные по сделкам: {}", response);
    } else if (response.hasSubscribeCandlesResponse()) {
      var subscribeResult = response.getSubscribeCandlesResponse().getCandlesSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("свечи", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeInfoResponse()) {
      var subscribeResult = response.getSubscribeInfoResponse().getInfoSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("статусы", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeOrderBookResponse()) {
      var subscribeResult = response.getSubscribeOrderBookResponse().getOrderBookSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("стакан", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeTradesResponse()) {
      var subscribeResult = response.getSubscribeTradesResponse().getTradeSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("сделки", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    } else if (response.hasSubscribeLastPriceResponse()) {
      var subscribeResult = response.getSubscribeLastPriceResponse().getLastPriceSubscriptionsList().stream()
        .collect(Collectors.groupingBy(el -> el.getSubscriptionStatus().equals(SubscriptionStatus.SUBSCRIPTION_STATUS_SUCCESS), Collectors.counting()));
      logSubscribeStatus("последние цены", subscribeResult.getOrDefault(true, 0L), subscribeResult.getOrDefault(false, 0L));
    }
  };
  Consumer onErrorCallback = error -> log.error(error.toString());

  //Подписка на список инструментов. Не блокирующий вызов
  //При необходимости обработки ошибок (реконнект по вине сервера или клиента), рекомендуется сделать onErrorCallback
  api.getMarketDataStreamService().newStream("trades_stream", processor, onErrorCallback).subscribeTrades(randomFigi);
  api.getMarketDataStreamService().newStream("candles_stream", processor, onErrorCallback).subscribeCandles(randomFigi);
  api.getMarketDataStreamService().newStream("info_stream", processor, onErrorCallback).subscribeInfo(randomFigi);
  api.getMarketDataStreamService().newStream("orderbook_stream", processor, onErrorCallback).subscribeOrderbook(randomFigi);
  api.getMarketDataStreamService().newStream("last_prices_stream", processor, onErrorCallback).subscribeLastPrices(randomFigi);


  //Для стримов стаканов и свечей есть перегруженные методы с дефолтными значениями
  //глубина стакана = 10, интервал свечи = 1 минута
  api.getMarketDataStreamService().getStreamById("trades_stream").subscribeOrderbook(randomFigi);
  api.getMarketDataStreamService().getStreamById("candles_stream").subscribeCandles(randomFigi);
  api.getMarketDataStreamService().getStreamById("candles_stream").cancel();
  //отписываемся от стримов с задержкой
  CompletableFuture.runAsync(()->{

    //Отписка на список инструментов. Не блокирующий вызов
    api.getMarketDataStreamService().getStreamById("trades_stream").unsubscribeTrades(randomFigi);
    api.getMarketDataStreamService().getStreamById("candles_stream").unsubscribeCandles(randomFigi);
    api.getMarketDataStreamService().getStreamById("info_stream").unsubscribeInfo(randomFigi);
    api.getMarketDataStreamService().getStreamById("orderbook_stream").unsubscribeOrderbook(randomFigi);
    api.getMarketDataStreamService().getStreamById("last_prices_stream").unsubscribeLastPrices(randomFigi);

    //закрытие стрима
    api.getMarketDataStreamService().getStreamById("candles_stream").cancel();

  }, delayedExecutor)
    .thenRun(()->log.info("market data unsubscribe done"));


  //Каждый marketdata стрим может отдавать информацию максимум по 300 инструментам
  //Если нужно подписаться на большее количество, есть 2 варианта:
  // - открыть новый стрим
  api.getMarketDataStreamService().newStream("new_stream", processor, onErrorCallback).subscribeCandles(randomFigi);
  // - отписаться от инструментов в существующем стриме, освободив место под новые
  api.getMarketDataStreamService().getStreamById("new_stream").unsubscribeCandles(randomFigi);

  //При вызове newStream с id уже подписаного приведет к пересозданию стрима с версии 1.4
  api.getMarketDataStreamService().newStream("candles_stream", processor, onErrorCallback)
    .subscribeCandles(randomFigi);
}

Выглядит непонятным — какие-то стримы, процессоры и куча лямбд. Как новичку, который просто хочет поиграться с рыночными данными, во всем этом разобраться? Я предлагаю использовать мой стартер.

Вот так будет выглядеть получение события об изменении цены доллара:

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceHandler implements BlockingLastPriceHandler {

    @Override
    public void handleBlocking(LastPrice lastPrice) {
       //отправляем notification в telegram когда цена == 100
    }
}

Вся внутренняя реализация скрыта, просто напишите то, что вы будете делать каждый раз когда изменяется цена, handleBlocking будет вызываться каждый раз когда цена доллара изменится на бирже. Кстати если вы используете jdk21+ BlockingLastPriceHandler будет исполнен на виртуальном потоке. Если jdk ниже 21 версии рекомендую использовать способ с CompletableFuture

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceAsyncHandler implements AsyncLastPriceHandler {

    @Override
    public CompletableFuture handleAsync(LastPrice lastPrice) {
        return CompletableFuture.runAsync(() ->  //отправляем notification в telegram когда цена == 100);
    }
}

А если вы захотите написать тоже самое, например, для акции Сбербанка, неужели создавать еще один класс и копипастить обработку?
Нет, общую логику, например, логирование можно выносить следующим образом

@HandleAllLastPrices(tickers = {"USDRUB", "SBER"})
class CommonLastPriceHandler implements AsyncLastPriceHandler {

    @Override
    public CompletableFuture handleAsync(LastPrice lastPrice) {
        return CompletableFuture.runAsync(() -> System.out.println("CommonLastPriceHandler: " + lastPrice));
    }
}

Или в стиле configuration


    @Bean
    public BlockingLastPriceStreamProcessorAdapter coroutineLastPriceStreamProcessorAdapter() {
        return LastPriceStreamProcessorAdapterFactory
//            .runAfterEachLastPriceHandler(true) опционально
//            .runBeforeEachLastPriceHandler(true) опционально
                .withTickers(List.of("USDRUB", "SBER"))
                .createBlockingHandler(lastPrice -> System.out.println("LastPriceStreamProcessorAdapterFactory" + lastPrice)); // для jdk 21+ BlockingHandler будет исполнен в виртуальном потоке
    }

Все хендлеры будут созданы как компоненты spring, поэтому можно отдельно написать условный TelegramService и инжектить его в любой хендлер

@HandleLastPrice(ticker = "USDRUB")
class DollarLastPriceHandler implements BlockingLastPriceHandler {
    private TelegramService telegramService;

    public void DollarLastPriceHandler(TelegramService telegramService) {
       this.telegramService = telegramService;
    }

    @Override
    public void handleBlocking(LastPrice lastPrice) {
       //отправляем notification в telegram когда цена == 100
       //telegramService.sendNotification()
    }
}

Обрабатывать сделки, стаканы, последние цены, обновление портфеля и т.д. можно аналогично. Отличие будет в названии аннотаций и интерфейсов. Подробнее можно ознакомиться в README. Также есть два демо проекта:

На kotlin + gradle.kts
На java + maven

Чтобы это все заработало:

  1. Я бы рекомендовал почитать про возможности, понятия и определения в официальной документации Tinkoff Invest API. Если вы уже с ней знакомы — можно пропустить этот пункт

  2. Создаем любым удобным способом spring boot проект и подключаем зависимости:

Для build.gradle.kts

implementation("io.github.dankosik:invest-api-java-sdk-starter:1.6.0-RC1")

И понадобится одна из:

implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-webflux")

Для build.gradle

implementation 'io.github.dankosik:invest-api-java-sdk-starter:1.6.0-RC1'

И понадобится одна из:

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

Для Maven


    io.github.dankosik
    invest-api-java-sdk-starter
    1.6.0-RC1
    plain

И понадобится одна из:


    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-webflux

  1. Всталяем это в application.yml заменяя токен на реальный (как получить токен)

tinkoff:
  starter:
    apiToken:
      fullAccess:
        "ваш токен"
  1. Берем любой понравившийся пример из демо проектов или как вариант печатаем в консоль все сделки, которые исполняются на бирже по акциям Сбербанка

@HandleTrade(ticker = "SBER")
class BlockingSberTradeHandler implements BlockingTradeHandler {

    @Override
    public void handleBlocking(Trade trade) {
        System.out.println(trade);
    }
}

Дальнейшие планы:

  • Написание доки/wiki по стартеру.

  • Поддержка Tinkoff Invest API версий 1.7 и 1.8 (так как сейчас стартер работает на версии 1.6)

  • После получения фидбека буду пилить новые фичи. Поэтому обязательно пишите интересен ли проект, предлагайте улучшения.

Полезные ссылки:

© Habrahabr.ru