[Перевод] Симфония асинхронии: задачи JavaFX и сокеты Netty

Всем доброй пятницы!

У нас наконец-то дошли руки до книги о Netty, которую нам рекомендовали в том числе благодарные читатели нашего хаброблога.

f2bddf398ffb440495287edbf4d9f01c.jpg

Признаться, у нас давно не выходило ничего узкотематического по Java. Но тема Netty вызывает на Хабре самый живой интерес, поэтому мы решили разместить обзорный материал по ней (автор почерпнул идею поста из этой книги) и устроить самый ориентировочный опрос. Заходите, высказывайтесь!

В этой статье рассказано, как интегрировать клиент/серверный фреймворк Netty в приложение JavaFX. Хотя и можно написать распределенное приложение, воспользовавшись простыми протоколами, работающими по модели запрос/отклик, например, HTTP или RMI, эти протоколы зачастую неэффективны и недостаточно функциональны для приложений, требующих постоянных серверных обновлений, push-уведомлений, выполняющих долгосрочные операции. Netty использует эффективную сетевую реализацию, в основе которой лежит асинхронная обработка и соединения, зависящие от состояний. Такая структура позволяет обойтись без дополнительных уловок, например, не требует делать опрос для обновления клиентского кода.

Интегрируя Netty с JavaFX, нужно гарантировать, что взаимодействия с UI у вас реализуются в потоке interaction FX, не блокируя UI. Таким образом, нужно обернуть вызовы Netty в класс Task из FX. Класс FX Task предоставляет поток для долгосрочных операций, и в большинстве случаев можно позволить Netty просто ожидать отклика (wait()). Это делается при помощи вызова sync (), который обеспечивает блокировку, но не приводит к подвисанию приложения.

Этот пример сделан на основе программы для обмена эхо-запросами между клиентом и сервером, которую я нашел в книге «Netty in Action» Нормана Маурера и Марвина Аллена Вольфталя. После того как соединение будет установлено, клиент собирает строкуjava.lang.String и отправляет ее на сервер. Сервер преобразует эту строку при помощи toUpperCase() и отправляет полученную строку обратно к клиенту. Клиент отображает строку в пользовательском интерфейсе.

Весь код к этому посту лежит на GitHub.

Проект

Для удобства я упаковал весь серверный и клиентский код в один проект Maven. Следующая UML-диаграмма классов демонстрирует, какие классы есть в нашей программе.

4778211a35554a37a9625125a91565eb.png

Диаграмма классов эхо-клиента на FX

В EchoServer и EchoClient содержатся функции main(), являющиеся входными точками для серверных и клиентских процессов. В EchoServer содержится код Netty для начальной загрузки, связывания и создания конвейера со специальным обработчиком EchoServerHandler. EchoClient создает объект пользовательского интерфейса EchoClientController, в котором содержится код Netty для создания соединения, разрыва соединения, отправки и получения. Контроллер EchoClientController также создает клиентский конвейер при помощи EchoClientHandler.

На диаграмме показана последовательность соединение/отправка/получение/разрыв соединения. Она не нормализована, поэтому некоторые операции («Enter Text», «Netty Connect») номинальны и в коде отсутствуют. Обмен данными в программе в основном реализован при помощи стандартного связывания JavaFX и Netty Futures.

84f7a4e504d34a3aa935296f020439ad.png

Итак, вот как схематически выглядит наша последовательность.

  1. Пользователь нажимает кнопку Connect.
  2. Контроллер EchoClientController выполняет начальную загрузку и подключается к EchoServer.
  3. Пользователь вводит текст и нажимает кнопку Send.
  4. В канале вызывается операция writeAndFlush(). Вызываются методы channelRead() и channelReadComplete() обработчика EchoServerHandler.
  5. Метод channelRead() обработчика EchoServerHandler выполняет собственный метод write(), а метод channelReadComplete() выполняет flush().
  6. EchoClientHandler получает данные
  7. EchoClientHandler устанавливает свойство StringProperty, связанное с UI. Автоматически обновляется поле TextField в UI.
  8. Пользователь нажимает кнопку Disconnect.
  9. Контроллер EchoClientController закрывает свой канал Channel и отключает группу EventGroup (на схеме отсутствует).

Клиентский код

Поскольку весь код есть на GitHub, основное внимание в этой статье я уделю взаимодействию клиентской JavaFX и Netty interaction. Опускаю тривиальный подкласс EchoClient JavaFX Application, создающий сцену (Stage) и загружающий файл EchoClient.fxml. Интересующий нас клиентский код находится в классе EchoClientController.

connect ()

Метод connect() берет из UI хост и порт и создает канал Netty, который затем сохраняется как поле EchoClientController.

Из EchoClientController.java

@FXML
HBox hboxStatus;
 
@FXML
ProgressIndicator piStatus;
 
@FXML
Label lblStatus;
 
private BooleanProperty connected = new SimpleBooleanProperty(false);
private StringProperty receivingMessageModel = new SimpleStringProperty("");
private Channel channel;

@FXML
public void connect() {
 
 if( connected.get() ) {
  return;  // соединение уже установлено; предотвратить и отключить
 }
 
 String host = tfHost.getText();
 int port = Integer.parseInt(tfPort.getText());

 group = new NioEventLoopGroup();
 
 Task task = new Task() {
  @Override
  protected Channel call() throws Exception {
   
   updateMessage("Bootstrapping");
   updateProgress(0.1d, 1.0d);
   
   Bootstrap b = new Bootstrap();
   b
   .group(group)
   .channel(NioSocketChannel.class)
   .remoteAddress( new InetSocketAddress(host, port) )
   .handler( new ChannelInitializer() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
     ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel));
    }
   });
   
   ChannelFuture f = b.connect();
   Channel chn = f.channel();
   
   updateMessage("Connecting");
   updateProgress(0.2d, 1.0d);

   f.sync();

   return chn;
  }

  @Override
  protected void succeeded() {
   
   channel = getValue();
   connected.set(true);
  }

  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client connect error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

Вызовы Netty для начальной загрузки и соединения обернуты в задачу JavaFX. Задача — ключевая концепция при программировании на JavaFX, и у меня есть правило: класть в задачу любой код, который потенциально может выполняться дольше секунды. Таким образом, в задачи у меня попадает практически все, за исключением манипуляций с Java-объектами в оперативной памяти.

Задача предоставляет несколько свойств: runningProperty, messageProperty, progressProperty. Я связываю их с элементами UI: контейнером HBox, меткой Label, индикатором ProgressIndicator. Благодаря связыванию JavaFX отпадает необходимость регистрировать слушатели и вызывать методы setter () в элементах управления пользовательского интерфейса.

Метод call() возвращает канал. В этой реализации меня не волнует асинхронное поведение Netty — ведь я уже работаю в новом Thread() — поэтому могу дождаться, пока вернется вызов sync(). Возвращенное значение канала устанавливается в поле метода succeeded(). Если Netty выдаст исключение, то вызывается метод failed(), сообщение логируется и выводится пользователю в диалоговом окне.

Методы succeeded(), failed(), updateMessage() и updateProgress() выполняются в потоке FX, а call() — нет. Метод call() никоим образом не должен обновлять UI. Метод call () должен заниматься только лишь долгосрочной эксплуатацией Netty.

send ()

Метод send() использует сохраненный объект Channel, чтобы вызвать метод writeAndFlush(). Этот writeAndFlush() будет запускаться при помощи делегата EchoClientHandler посредством фреймворка Netty.

Также из EchoClientController.java

@FXML
public void send() {

 if( !connected.get() ) {
  return;
 }
 
 final String toSend = tfSend.getText();
 
 Task task = new Task() {

  @Override
  protected Void call() throws Exception {
   
   ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) );
   f.sync();

   return null;
  }
  
  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client send error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }

 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

Обратите внимание на сходство с connect(). Новоиспеченная задача связывается все с теми же тремя progress-объектами. Методаsucceeded() нет, а метод failed() содержит ту же логику, что и обработчик ошибок в реализации connect().

Задача ничего не возвращает (возвращаемый тип Void). В оптимистичном сценарии вызов должен сработать сразу, но если бы он не сработал, то следовало бы дождаться ошибку. Поскольку метод call() у меня уже в новом потоке, я могу позволить себе ожидать в методе sync().

disconnect ()

Метод disconnect() работает с задачей Task по тому же принципу, что и два предыдущих метода. Два других метода используют одну пару updateMessage/Progress. В этом методе обертывание соединения с Netty происходит за два отдельных этапа. На выполнение sync() в close () требуется не так много времени. Метод shutdownGracefully() выполняется существенно дольше. Однако, в в моих экспериментах UI ни разу не подвисал.

@FXML
public void disconnect() {
 
 if( !connected.get() ) {
  return;
 }
 
 Task() {

  @Override
  protected Void call() throws Exception {
   
   updateMessage("Disconnecting");
   updateProgress(0.1d, 1.0d);
   
   channel.close().sync();     

   updateMessage("Closing group");
   updateProgress(0.5d, 1.0d);
   group.shutdownGracefully().sync();

   return null;
  }

  @Override
  protected void succeeded() {
   
   connected.set(false);
  }

  @Override
  protected void failed() {
   
   connected.set(false);

   Throwable t = getException();
   logger.error( "client disconnect error", t );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( t.getClass().getName() );
   alert.setContentText( t.getMessage() );
   alert.showAndWait();

  }
  
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());

 new Thread(task).start();
}

Считывание

Считывание с сервера опосредуется через объект EchoClientHandler. При создании этого объекта ставится ссылка на свойство StringProperty, являющееся элементом модели, с которым также связывается пользовательский интерфейс. Я мог бы передавать элементы UI непосредственно обработчику, но при этом нарушается принцип разделения ответственности и становится сложнее применять это уведомление сразу к нескольким представлениям. Таким образом, свойство StringProperty может связываться с любым количеством элементов UI, и когда поступит обновление от обработчика, обновятся все эти UI-элементы.

Вот код EchoClientHandler.java. Обратите внимание на защиту FX Thread в методе channelRead0().

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {

 private Logger logger = LoggerFactory.getLogger( EchoClientHandler.class );

 private final StringProperty receivingMessageModel;
 
 public EchoClientHandler(StringProperty receivingMessageModel) {
  this.receivingMessageModel = receivingMessageModel;
 }
 
 @Override
 protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) 
              throws Exception {
  final String cm = in.toString(CharsetUtil.UTF_8);
  Platform.runLater( () -> receivingMessageModel.set(cm) );
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo client", cause );
  ctx.close();
 } 
}

Последнее замечание о последовательности связывания… мы не знаем, когда будет вызываться
channelRead0() (в данном случае мы полагаемся на асинхронность Netty), но когда такой вызов произойдет, мы установим объект модели. Я заканчиваю обновление объекта модели, обеспечивая некоторую защиту FX Thread. FX — поскольку это фреймворк для связывания — обновит все элементы UI, например, TextField.

Заключительные замечания о клиентском коде

При интеграции Netty с JavaFX самое важное — использовать задачи. Благодаря задачам UI не подвисает, благодаря предоставляемым свойствам всю работу можно отслеживать визуально. Благодаря задачам частично отпадает необходимость в асинхронной обработке со стороны Netty (как минимум, на прикладном уровне), поэтому задачи могут сколько угодно блокироваться, не блокируя пользовательский интерфейс. Получая уведомления о новых данных, попробуйте использовать связывание JavaFX, опосредованное через выделенный объект модели и обновлять таким образом UI, а не выполнять отдельные вызовы к конкретным объектам.

Серверный код

Просто привожу здесь весь серверный код без комментариев, так как статья посвящена клиентским аспектам Netty. Очень похожий пример есть в книге Manning

Из EchoServer.java

public class EchoServer {

 private final int port;
 
 public EchoServer(int port) {
  this.port = port;
 }
 
 public static void main(String[] args) throws Exception {
  if( args.length != 1 ) {
   System.err.println("usage: java EchoServer port");
   System.exit(1);
  }
  
  int port = Integer.parseInt(args[0]);
  new EchoServer(port).start();
 }
 
 public void start() throws Exception {
  
  final EchoServerHandler echoServerHandler = new EchoServerHandler();
  
  EventLoopGroup group = new NioEventLoopGroup();
  
  try {
   ServerBootstrap b = new ServerBootstrap();
   b
    .group(group)
    .channel(NioServerSocketChannel.class)
    .localAddress(new InetSocketAddress(port))
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast( echoServerHandler );
     }     
    });
    
   ChannelFuture f = b.bind().sync();
   
   f.channel().closeFuture().sync();
   
  } finally {
   group.shutdownGracefully().sync();
  }
 }
}

Из EchoServerHandler.java
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

 private Logger logger = LoggerFactory.getLogger( EchoServerHandler.class );
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  
  ByteBuf in = (ByteBuf)msg;
  String in_s = in.toString(CharsetUtil.UTF_8);
  String uc = in_s.toUpperCase();
  if( logger.isInfoEnabled() ) {
   logger.info("[READ] read " + in_s + ", writing " + uc);
  }
  in.setBytes(0,  uc.getBytes(CharsetUtil.UTF_8));
  ctx.write(in);  // записывает байты обратно к адресату (не сбрасывает)
 }

 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  if( logger.isDebugEnabled() ) {
   logger.debug("[READ COMPLETE]");
  }
  ctx.flush();
 }
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  super.channelActive(ctx);
  if(logger.isDebugEnabled() ) {
   logger.debug("[CHANNEL ACTIVE]");
  }
  ctx.channel().closeFuture().addListener(f -> logger.debug("[CLOSE]"));
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo server", cause);
  ctx.close();
 }
}

Хотя современные скоростные компьютеры вполне могут тратить часть циклов на опрос, благодаря эффективному сетевому уровню ваше приложение будет быстро реагировать и получится динамичным, а также избавит сервер от лишней работы.

© Habrahabr.ru