Справочник по синхронизаторам java.util.concurrent.*
Целью данной публикации не является полный анализ синхронизаторов из пакета java.util.concurrent. Пишу её, прежде всего, как справочник, который облегчит вхождение в тему и покажет возможности практического применения классов для синхронизации нитей.
В java.util.concurrent много различных классов, которые по функционалу можно поделить на группы: Concurrent Collections, Executors, Atomics и т.д. Одной из этих групп будет Synchronizers (синхронизаторы).
Синхронизаторы — вспомогательные утилиты для синхронизации нитей, которые дают возможность разработчику регулировать и/или ограничивать работу нитей и предоставляют более высокий уровень абстракции, чем основные примитивы языка (мониторы).
Semaphore
Синхронизатор Semaphore реализует шаблон синхронизации Семафор. Чаще всего, семафоры необходимы, когда нужно ограничить доступ к некоторому общему ресурсу. В конструктор этого класса (Semaphore(int permits)
или Semaphore(int permits, boolean fair)
) обязательно передается количество нитей, которому семафор будет разрешать одновременно использовать заданный ресурс.
Доступ управляется с помощью счётчика: изначально значение счётчика равно int permits
, когда нить заходит в заданный блок кода, то значение счётчика уменьшается на единицу, когда нить его покидает, то увеличивается. Если значение счётчика равно нулю, то текущая нить блокируется, пока кто-нибудь не выйдет из блока (в качестве примера из жизни с permits = 1
, можно привести очередь в кабинет в поликлинике: когда пациент покидает кабинет, мигает лампа, и заходит следующий пациент).
Официальная документация по Semaphore.
import java.util.concurrent.Semaphore;
public class Parking {
//Парковочное место занято - true, свободно - false
private static final boolean[] PARKING_PLACES = new boolean[5];
//Устанавливаем флаг "справедливый", в таком случае метод
//aсquire() будет раздавать разрешения в порядке очереди
private static final Semaphore SEMAPHORE = new Semaphore(5, true);
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 7; i++) {
new Thread(new Car(i)).start();
Thread.sleep(400);
}
}
public static class Car implements Runnable {
private int carNumber;
public Car(int carNumber) {
this.carNumber = carNumber;
}
@Override
public void run() {
System.out.printf("Автомобиль №%d подъехал к парковке.\n", carNumber);
try {
//acquire() запрашивает доступ к следующему за вызовом этого метода блоку кода,
//если доступ не разрешен, нить вызвавшая этот метод блокируется до тех пор,
//пока семафор не разрешит доступ
SEMAPHORE.acquire();
//Ищем свободное место и паркуемся
for (int i = 0; i < 5; i++)
if (!PARKING_PLACES[i]) { //Если место свободно
PARKING_PLACES[i] = true; //занимаем его
System.out.printf("Автомобиль №%d припарковался на месте %d.\n", carNumber, i);
Thread.sleep(5000); //Уходим за покупками, к примеру
PARKING_PLACES[i] = false;//Освобождаем место
break;
}
//release(), напротив, освобождает ресурс
SEMAPHORE.release();
System.out.printf("Автомобиль №%d покинул парковку.\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Автомобиль №1 подъехал к парковке.
Автомобиль №1 припарковался на месте 0.
Автомобиль №2 подъехал к парковке.
Автомобиль №2 припарковался на месте 1.
Автомобиль №3 подъехал к парковке.
Автомобиль №3 припарковался на месте 2.
Автомобиль №4 подъехал к парковке.
Автомобиль №4 припарковался на месте 3.
Автомобиль №5 подъехал к парковке.
Автомобиль №5 припарковался на месте 4.
Автомобиль №6 подъехал к парковке.
Автомобиль №7 подъехал к парковке.
Автомобиль №1 покинул парковку.
Автомобиль №6 припарковался на месте 0.
Автомобиль №2 покинул парковку.
Автомобиль №7 припарковался на месте 1.
Автомобиль №3 покинул парковку.
Автомобиль №4 покинул парковку.
Автомобиль №5 покинул парковку.
Автомобиль №6 покинул парковку.
Автомобиль №7 покинул парковку.
Семафор отлично подходит для решения такой задачи: он не дает автомобилю (нити) припарковаться (зайти в заданный блок кода и воспользоваться общим ресурсом) если мест на парковке нет (счётчик равен 0) Стоит отметить, что класс Semaphore поддерживает захват и освобождение более чем одного разрешения за раз, но в данном задаче это не нужно.
CountDownLatch
CountDownLatch (замок с обратным отсчетом) предоставляет возможность любому количеству нитей в блоке кода ожидать до тех пор, пока не завершится определенное количество операций, выполняющихся в других нитях, перед тем как они будут «отпущены», чтобы продолжить свою деятельность. В конструктор CountDownLatch (CountDownLatch(int count)
) обязательно передается количество операций, которое должно быть выполнено, чтобы замок «отпустил» заблокированные нити.
Блокировка нитей снимается с помощью счётчика: любая действующая нить, при выполнении определенной операции уменьшает значение счётчика. Когда счётчик достигает 0, все ожидающие нити разблокируются и продолжают выполняться (примером CountDownLatch из жизни может служить сбор экскурсионной группы: пока не наберется определенное количество человек, экскурсия не начнется).
Официальная документация по CountDownLatch.
- Каждый из пяти автомобилей подъехал к стартовой прямой;
- Была дана команда «На старт!»;
- Была дана команда «Внимание!»;
- Была дана команда «Марш!».
import java.util.concurrent.CountDownLatch;
public class Race {
//Создаем CountDownLatch на 8 "условий"
private static final CountDownLatch START = new CountDownLatch(8);
//Условная длина гоночной трассы
private static final int trackLength = 500000;
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 5; i++) {
new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start();
Thread.sleep(1000);
}
Thread.sleep(1000);
System.out.println("На старт!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
Thread.sleep(1000);
System.out.println("Внимание!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
Thread.sleep(1000);
System.out.println("Марш!");
START.countDown();//Команда дана, уменьшаем счетчик на 1
//счетчик становится равным нулю, и все ожидающие нити
//одновременно разблокируются
}
public static class Car implements Runnable {
private int carNumber;
private int carSpeed;//считаем, что скорость автомобиля постоянная
public Car(int carNumber, int carSpeed) {
this.carNumber = carNumber;
this.carSpeed = carSpeed;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к стартовой прямой.\n", carNumber);
//Автомобиль подъехал к стартовой прямой - условие выполнено
//уменьшаем счетчик на 1
START.countDown();
//метод await() блокирует нить, вызвавшую его, до тех пор, пока
//счетчик CountDownLatch не станет равен 0
START.await();
Thread.sleep(trackLength / carSpeed);//ждем пока проедет трассу
System.out.printf("Автомобиль №%d финишировал!\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Автомобиль №1 подъехал к стартовой прямой.
Автомобиль №2 подъехал к стартовой прямой.
Автомобиль №3 подъехал к стартовой прямой.
Автомобиль №4 подъехал к стартовой прямой.
Автомобиль №5 подъехал к стартовой прямой.
На старт!
Внимание!
Марш!
Автомобиль №4 финишировал!
Автомобиль №1 финишировал!
Автомобиль №3 финишировал!
Автомобиль №5 финишировал!
Автомобиль №2 финишировал!
CountDownLatch может быть использован в самых разных схемах синхронизации: к примеру, чтобы пока одна нить выполняет работу, заставить другие нити ждать или, наоборот, чтобы заставить нить ждать других, чтобы выполнить работу.
CyclicBarrier
CyclicBarrier реализует шаблон синхронизации Барьер. Циклический барьер является точкой синхронизации, в которой указанное количество параллельных нитей встречается и блокируется. Как только все нити прибыли, выполняется опционное действие (или не выполняется, если барьер был инициализирован без него), и, после того, как оно выполнено, барьер ломается и ожидающие нити «освобождаются». В конструктор барьера (CyclicBarrier(int parties)
и CyclicBarrier(int parties, Runnable barrierAction)
) обязательно передается количество сторон, которые должны «встретиться», и, опционально, действие, которое должно произойти, когда стороны встретились, но перед тем когда они будут «отпущены».
Барьер похож на CountDownLatch, но главное различие между ними в том, что вы не можете заново использовать «замок» после того, как его счётчик достигнет нуля, а барьер вы можете использовать снова, даже после того, как он сломается. CyclicBarrier является альтернативой метода join()
, который «собирает» нити только после того, как они выполнились.
Официальная документация по CyclicBarrier.
import java.util.concurrent.CyclicBarrier;
public class Ferry {
private static final CyclicBarrier BARRIER = new CyclicBarrier(3, new FerryBoat());
//Инициализируем барьер на три нити и таском, который будет выполняться, когда
//у барьера соберется три нити. После этого, они будут освобождены.
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 9; i++) {
new Thread(new Car(i)).start();
Thread.sleep(400);
}
}
//Таск, который будет выполняться при достижении сторонами барьера
public static class FerryBoat implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("Паром переправил автомобили!");
} catch (InterruptedException e) {
}
}
}
//Стороны, которые будут достигать барьера
public static class Car implements Runnable {
private int carNumber;
public Car(int carNumber) {
this.carNumber = carNumber;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к паромной переправе.\n", carNumber);
//Для указания нити о том что она достигла барьера, нужно вызвать метот await()
//После этого данная нить блокируется, и ждет пока остальные стороны достигнут барьера
BARRIER.await();
System.out.printf("Автомобиль №%d продолжил движение.\n", carNumber);
} catch (Exception e) {
}
}
}
}
Автомобиль №0 подъехал к паромной переправе.
Автомобиль №1 подъехал к паромной переправе.
Автомобиль №2 подъехал к паромной переправе.
Автомобиль №3 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №2 продолжил движение.
Автомобиль №1 продолжил движение.
Автомобиль №0 продолжил движение.
Автомобиль №4 подъехал к паромной переправе.
Автомобиль №5 подъехал к паромной переправе.
Автомобиль №6 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №5 продолжил движение.
Автомобиль №4 продолжил движение.
Автомобиль №3 продолжил движение.
Автомобиль №7 подъехал к паромной переправе.
Автомобиль №8 подъехал к паромной переправе.
Паром переправил автомобили!
Автомобиль №8 продолжил движение.
Автомобиль №6 продолжил движение.
Автомобиль №7 продолжил движение.
Когда три нити достигают метода
await()
, барьерное действие запускается, и паром переправляет три автомобиля из скопившихся. После этого начинается новый цикл.Exchanger
Exchanger (обменник) может понадобиться, для того, чтобы обменяться данными между двумя нитями в определенной точки работы обоих нитей. Обменник — обобщенный класс, он параметризируется типом объекта для передачи.
Обменник является точкой синхронизации пары нитей: нить, вызывающая у обменника метод exchange()
блокируется и ждет другую нить. Когда другая нить вызовет тот же метод, произойдет обмен объектами: каждая из них получит аргумент другой в методе exchange()
. Стоит отметить, что обменник поддерживает передачу null
значения. Это дает возможность использовать его для передачи объекта в одну сторону, или, просто как точку синхронизации двух нитей.
Официальная документация по Exchanger.
import java.util.concurrent.Exchanger;
public class Delivery {
//Создаем обменник, который будет обмениваться типом String
private static final Exchanger EXCHANGER = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
String[] p1 = new String[]{"{посылка A->D}", "{посылка A->C}"};//Формируем груз для 1-го грузовика
String[] p2 = new String[]{"{посылка B->C}", "{посылка B->D}"};//Формируем груз для 2-го грузовика
new Thread(new Truck(1, "A", "D", p1)).start();//Отправляем 1-й грузовик из А в D
Thread.sleep(100);
new Thread(new Truck(2, "B", "C", p2)).start();//Отправляем 2-й грузовик из В в С
}
public static class Truck implements Runnable {
private int number;
private String dep;
private String dest;
private String[] parcels;
public Truck(int number, String departure, String destination, String[] parcels) {
this.number = number;
this.dep = departure;
this.dest = destination;
this.parcels = parcels;
}
@Override
public void run() {
try {
System.out.printf("В грузовик №%d погрузили: %s и %s.\n", number, parcels[0], parcels[1]);
System.out.printf("Грузовик №%d выехал из пункта %s в пункт %s.\n", number, dep, dest);
Thread.sleep(1000 + (long) Math.random() * 5000);
System.out.printf("Грузовик №%d приехал в пункт Е.\n", number);
parcels[1] = EXCHANGER.exchange(parcels[1]);//При вызове exchange() нить блокируется и ждет
//пока другая нить вызовет exchange(), после этого произойдет обмен посылками
System.out.printf("В грузовик №%d переместили посылку для пункта %s.\n", number, dest);
Thread.sleep(1000 + (long) Math.random() * 5000);
System.out.printf("Грузовик №%d приехал в %s и доставил: %s и %s.\n", number, dest, parcels[0], parcels[1]);
} catch (InterruptedException e) {
}
}
}
}
В грузовик №1 погрузили: {посылка A→D} и {посылка A→C}.
Грузовик №1 выехал из пункта A в пункт D.
В грузовик №2 погрузили: {посылка B→C} и {посылка B→D}.
Грузовик №2 выехал из пункта B в пункт C.
Грузовик №1 приехал в пункт Е.
Грузовик №2 приехал в пункт Е.
В грузовик №2 переместили посылку для пункта C.
В грузовик №1 переместили посылку для пункта D.
Грузовик №2 приехал в C и доставил: {посылка B→C} и {посылка A→C}.
Грузовик №1 приехал в D и доставил: {посылка A→D} и {посылка B→D}.
Как мы видим, когда один грузовик (одна нить) приезжает в пункт Е (достигает точки синхронизации), он ждет пока другой грузовик (другая нить) приедет в пункт Е (достигнет точки синхронизации). После этого происходит обмен посылками (String) и оба грузовика (нити) продолжают свой путь (работу).
Phaser
Phaser (фазер), как и CyclicBarrier, является реализацией шаблона синхронизации Барьер, но, в отличии от CyclicBarrier, предоставляет больше гибкости. Этот класс позволяет синхронизировать нити, представляющие отдельную фазу или стадию выполнения общего действия. Как и CyclicBarrier, Phaser является точкой синхронизации, в которой встречаются нити-участницы. Когда все стороны прибыли, Phaser переходит к следующей фазе и снова ожидает ее завершения.
Если сравнить Phaser и CyclicBarrier, то можно выделить следующие важные особенности Phaser:
- Каждая фаза (цикл синхронизации) имеет номер;
- Количество сторон-участников жестко не задано и может меняться: нить может регистрироваться в качестве участника и отменять свое участие;
- Участник не обязан ожидать, пока все остальные участники соберутся на барьере. Чтобы продолжить свою работу достаточно сообщить о своем прибытии;
- Случайные свидетели могут следить за активностью в барьере;
- Нить может и не быть стороной-участником барьера, чтобы ожидать его преодоления;
- У фазера нет опционального действия.
Объект Phaser создается с помощью одного из конструкторов:
Phaser()
Phaser(int parties)
Параметр parties указывает на количество сторон-участников, которые будут выполнять фазы действия. Первый конструктор создает объект Phaser без каких-либо сторон, при этом барьер в этом случае тоже «закрыт». Второй конструктор регистрирует передаваемое в конструктор количество сторон. Барьер открывается когда все стороны прибыли, или, если снимается последний участник. (У класса Phaser еще есть конструкторы, в которые передается родительский объект Phaser, но мы их рассматривать не будем.)
Основные методы:
- int register () — регистрирует сторону, которая выполняет фазы. Вызывается из нити, которая хочет зарегистрироваться. Возвращает номер текущей фазы;
- int getPhase () — возвращает номер текущей фазы;
- int arriveAndAwaitAdvance () — указывает что нить завершила выполнение фазы. Нить приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог
CyclicBarrier.await()
. Возвращает номер текущей фазы; - int arrive () — сообщает, что сторона завершила фазу, и возвращает номер фазы. При вызове данного метода нить не приостанавливается, а продолжает выполнятся;
- int arriveAndDeregister () — сообщает о завершении всех фаз стороной и снимает ее с регистрации. Возвращает номер текущей фазы;
- awaitAdvance (int phase) — если phase равно номеру текущей фазы, приостанавливает вызвавшую его нить до её окончания.
Официальная документация по Phaser.
import java.util.concurrent.Phaser;
public class Bus {
private static final Phaser PHASER = new Phaser(1);//Сразу регистрируем главную нить
//Фазы 0 и 6 - это автобусный парк, 1 - 5 остановки
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i < 5; i++) { //Генерируем пассажиров на остановках
if ((int) (Math.random() * 3) > 0)
new Passenger(PHASER, i, i + 1);//Этот пассажир выходит на следующей
if ((int) (Math.random() * 3) == 2)
new Passenger(PHASER, i, 5); //Этот пассажир выходит на конечной
}
System.out.println("Автобус выехал из парка.");
PHASER.arrive();//В фазе 0 всего 1 участник
for (int i = 1; i < 6; i++) {
System.out.println("Остановка № " + PHASER.getPhase());
Thread.sleep(100);//Ждем пока пассажиры сядут и сойдут
PHASER.arrive();//Кто не успел, тот опоздал)
}
System.out.println("Автобус уехал в парк.");
PHASER.arriveAndDeregister();//Снимаем главную нить, ломаем барьер
}
public static class Passenger extends Thread {
private Phaser phaser;
private int dep;
private int dest;
public Passenger(Phaser phaser, int departure, int destination) {
this.phaser = phaser;
this.dep = departure;
this.dest = destination;
System.out.println(this + " ждёт на остановке № " + dep);
this.start();
}
@Override
public void run() {
try {
while (dep != phaser.getPhase())//Пока автобус не приедет на нужную остановку
phaser.awaitAdvance(phaser.getPhase());//ждем его
Thread.sleep(10);
phaser.register();//Регистрируем нить, которая будет участвовать в фазах
System.out.println(this + " сел в автобус.");
while (dest != phaser.getPhase()) //Пока автобус не приедет на нужную остановку(фазу)
phaser.arriveAndAwaitAdvance(); //Заявляем в каждой фазе о готовности и ждем
Thread.sleep(10);
phaser.arriveAndDeregister(); //Отменяем регистрацию на нужной фазе
System.out.println(this + " покинул автобус.");
} catch (InterruptedException e) {
}
}
@Override
public String toString() {
return "Пассажир{" + dep + " -> " + dest + '}';
}
}
}
Пассажир{1 → 2} ждёт на остановке № 1
Пассажир{1 → 5} ждёт на остановке № 1
Пассажир{2 → 3} ждёт на остановке № 2
Пассажир{2 → 5} ждёт на остановке № 2
Пассажир{3 → 4} ждёт на остановке № 3
Пассажир{4 → 5} ждёт на остановке № 4
Пассажир{4 → 5} ждёт на остановке № 4
Автобус выехал из парка.
Остановка № 1
Пассажир{1 → 5} сел в автобус.
Пассажир{1 → 2} сел в автобус.
Остановка № 2
Пассажир{2 → 3} сел в автобус.
Пассажир{1 → 2} покинул автобус.
Пассажир{2 → 5} сел в автобус.
Остановка № 3
Пассажир{2 → 3} покинул автобус.
Пассажир{3 → 4} сел в автобус.
Остановка № 4
Пассажир{4 → 5} сел в автобус.
Пассажир{3 → 4} покинул автобус.
Пассажир{4 → 5} сел в автобус.
Остановка № 5
Пассажир{1 → 5} покинул автобус.
Пассажир{2 → 5} покинул автобус.
Пассажир{4 → 5} покинул автобус.
Пассажир{4 → 5} покинул автобус.
Автобус уехал в парк.
Кстати, функционалом фазера можно воспроизвести работу CountDownLatch.
import java.util.concurrent.Phaser;
public class NewRace {
private static final Phaser START = new Phaser(8);
private static final int trackLength = 500000;
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 5; i++) {
new Thread(new Car(i, (int) (Math.random() * 100 + 50))).start();
Thread.sleep(1000);
}
Thread.sleep(1000);
System.out.println("На старт!");
START.arriveAndDeregister();
Thread.sleep(1000);
System.out.println("Внимание!");
START.arriveAndDeregister();
Thread.sleep(1000);
System.out.println("Марш!");
START.arriveAndDeregister();
}
public static class Car implements Runnable {
private int carNumber;
private int carSpeed;
public Car(int carNumber, int carSpeed) {
this.carNumber = carNumber;
this.carSpeed = carSpeed;
}
@Override
public void run() {
try {
System.out.printf("Автомобиль №%d подъехал к стартовой прямой.\n", carNumber);
START.arriveAndDeregister();
START.awaitAdvance(START.getPhase());
Thread.sleep(trackLength / carSpeed);
System.out.printf("Автомобиль №%d финишировал!\n", carNumber);
} catch (InterruptedException e) {
}
}
}
}
Если кому-нибудь пригодилось, то я очень рад=)
Более подробно о Phaser здесь.
Почитать ещё о синхронизаторах и посмотреть примеры можно здесь.
Отличный обзор java.util.concurrent смотрите здесь.