Проблема использования CompletableFuture в нескольких потоках и её решение

image
В Java 8 появился новый класс CompletableFuture, который позволяет удобно писать асинхронный код.
При использовании CompletableFuture из нескольких потоков я столкнулся с его неочевидным поведением, а именно с тем, что callbacks на нём могут выполнятся совсем не в тех потоках, как ожидалось. Об этом и о том, как мне удалось решить проблему — я и расскажу в этой статье.

Мною разрабатывался асинхронный, неблокирующийся однопоточный клиент к серверу, который использовал потоконебезопасные структуры данных. Тесты проходили без проблем, но benchmarks иногда падали c ConcurrentModificationException на внутренних структурах однопоточного клиента.

Асинхронность в клиенте реализовывалась с использованием CompletableFuture, все операции внутри клиента производились в одном потоке (далее в коде — singleThreadExecutor).

Фрагмент кода клиента с методом get, который доступен пользователям:

//ожидающие завершения запросы
private final Set pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>());
    
public CompletableFuture get(String key) {
     CompletableFuture future = new CompletableFuture<>();
        
     //передаём задачу на выполнение в поток клиента
     singleThreadExecutor.execute(() -> {
             //добавляем future в список ожидающих завершения
             pendingFutures.add(future);
             future.whenComplete((v, e) -> {
                     //когда future завершится удаляем его из списка ожидающих
                     pendingFutures.remove(future);
             });
             //тут был код передающий запрос на сервер и получающий ответ от сервера
             //в конечном итоге код вызвал future.complete(data); в потоке этого singleThreadExecutor
     });

     return future;
}

Оказалось, что так делать нельзя.

Возможно, я узнал бы об этом раньше, если бы внимательно прочитал javadoc для CompletableFuture.

Посмотреть javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

При использовании такой архитектуры необходимо, чтобы все callbacks у CompletableFuture вызывались в том же потоке, который делает CompletableFuture.complete.
По указанному выше коду, вроде бы, так и происходит.
Но benchmarks иногда завершались с ConcurrentModificationException в коде, который перебирал pendingFutures в том же потоке клиента (singleThreadExecutor).

Дело в том, что callback, передаваемый в future.whenComplete (который вызывает pendingFutures.remove), иногда выполняется совсем в другом потоке.
А точнее в потоке приложения, которое пользуется моим клиентом:

Client client = new Client("127.0.0.1", 8080);
CompletableFuture result = client.get(key);
result.thenAccept(data -> {
    System.out.println(data);
});

Вызов result.thenAccept в этом приложении приводит иногда к вызову остальных callbacks на future, которые были добавлены внутри самого кода клиента.

Рассмотрим проблему на простых примерах


Thread mainThread = Thread.currentThread();
CompletableFuture future = new CompletableFuture<>();
future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread);
});
future.complete(null);

Такой код всегда выводин на экран true, так как callback выполняется в том же потоке, что и метод complete.

Но если к CompletableFuture будет хотя бы одно обращение из другого потока, то поведение может измениться:

//основной поток
Thread mainThread = Thread.currentThread();

//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();

CompletableFuture future = new CompletableFuture<>();
future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread)
});

//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
    future.thenRun(() -> {
        //nop
    });
});

//завершаем future
future.complete(null);

Такой код может иногда выдавать false.

Дело в том, что вызов thenRun у того же future, но во втором потоке, может привести к срабатыванию callback в первом thenRun. При этом callback первого thenRun будет вызван во втором потоке.
Это происходит в тот момент, когда future.complete(null) начало выполняться, но ещё не успело вызвать callbacks, а во втором потоке вызвался thenRun, который и выполнит все остальные callbacks на этом future, но уже в своём потоке.

Проблем решается просто:

//основной поток
Thread mainThread = Thread.currentThread();

//создаём второй поток
Executor executor = Executors.newSingleThreadExecutor();

CompletableFuture future = new CompletableFuture<>();
CompletableFuture secondThreadFuture = future.thenRun(() -> {
    System.out.println(Thread.currentThread() == mainThread);
});

//просто добавляем callback к тому же future в другом потоке
executor.execute(() -> {
    secondThreadFuture.thenRun(() -> {
        //nop
    });
});

//завершаем future
future.complete(null);

Мы просто добавили secondThreadFuture, которая зависит от результата исходной future. И вызов на ней thenRun во втором потоке не приводит к возможному срабатыванию callbacks на исходной future.

Для гарантированного вызова callbacks в заданных пользователем потоках у CompletableFuture существуют async реализации методов, например — thenRunAsync, которым нужно передавать Executor. Но async-версии методов могут работать медленней, чем обычные. Поэтому, я не хотел лишний раз их использовать.

Вывод


Вывод, который я сделал для себя: не использовать один объект CompletableFuture в нескольких потоках, если необходимо быть уверенным, что все callbacks на нём выполняются в заданном потоке. А если использовать несколько потоков с одним CompletableFuture необходимо — то достаточно передавать в другой поток не оригинальный CompletableFuture, а новый, который будет зависеть от исходного. Например, вот так:
CompletableFuture secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> {
    //nop
});

Комментарии (7)

  • 8 апреля 2017 в 19:47

    0

    А вызывать продолжения на та же экзекуторе не пробывали?
    • 8 апреля 2017 в 19:51

      0

      В конце статьи написал, что можно использовать async-варианты методов.
      Если вы имеете в виду callbacks, которые устанавливает пользовательское приложение, то оно не имеет доступа к потоку клиентской библиотеки. Поэтому добавляться callbacks могут из неизвестных заранее потоков.
      • 8 апреля 2017 в 20:01

        0

        А типа синглтон, которым by its very nature является единственный поток религия не велит?
        • 8 апреля 2017 в 20:02

          0

          Я не могу контролировать из каких потоков будут обращаться к моей клиентской библиотеки. Она однопоточная только внутри себя.
          • 8 апреля 2017 в 20:07

            +1

            Каюсь, не вник… Но идея библиотеки, завязанной на выполнение в одном потоке и не контролирующей это кажется странной. Мне кажется, что функциональность можно было бы оформить так, чтобы вызывающий код предоставил поток, в котором все будет выполнятся. Короче, если вызывается обычное продолжение, то, если склероз мне не изменяет, он будет выполнятся в некоем произвольном потоке на дефолтном тред-пуле. Если это так, то чему удивляться?
            • 8 апреля 2017 в 20:35 (комментарий был изменён)

              0

              Вы не поверите, но такие проблемы иногда встречаются в других клиентских либах.
              Например: https://github.com/atomix/copycat/issues/75
              Дело в том, что это общепринятая практика, что клиентская либа поддерживает какой-то внутренний пул потоков, в которых производит сетевые операции. Например java-клиент Kafka так работает.
              Конечно, если внимательно читать документацию, то многих проблем бы удалось избежать.
              • 8 апреля 2017 в 21:57

                0

                Верю, но это кажется мне просчетом дизайна (

© Habrahabr.ru