Как синхронизировать потоки в Java
Привет, Хабр!
Многопоточность — это не просто возможность приложения выполнять несколько задач одновременно, это его способность делать это эффективно и безопасно. В Java многопоточность неотделима от синхронизации, ведь именно она помогает управлять состоянием разделяемых ресурсов между потоками.
Всё начинается с потребности в быстродействии и масштабируемости. C несколькими потоками можно обрабатывать больше операций одновременно.
В этой статье мы рассмотрим, как синхронизировать потоки в Java.
Синхронизированные блоки
В Java слово synchronized
может использоваться для методов и блоков кода. Это некая »база» для корректной работы с общими ресурсами в многопоточной среде.
Пример:
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
Методы increment()
и getCount()
синхронизированы, что гарантирует, что только один поток может изменять или читать значение count
в один момент времени.
Пример синхронизированного блока:
public class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized(lock) {
count++;
}
}
public int getCount() {
synchronized(lock) {
return count;
}
}
}
Здесь используем синхронизированный блок с явным объектом блокировки lock
. Так можно управлять синхронизацией, ограничивая её только нужными частями кода.
Каждый объект в Java имеет связанный с ним монитор. Когда поток входит в синхронизированный блок или метод, он захватывает монитор этого объекта. Если монитор уже занят другим потоком, текущий поток будет ждать его освобождения:
public void addToQueue(String item) {
synchronized(queue) {
queue.add(item);
}
}
В примере, если один поток уже добавляет элемент в очередь queue
, другой поток будет ожидать, пока первый не закончит, прежде чем сможет выполнить добавление.
Методы wait (), notify () и notifyAll ()
Методы wait()
, notify()
, и notifyAll()
— это инструменты для координации работы между потоками, которые используют общие ресурсы. Они вызываются на объекте класса, который реализует интерфейс Object
, и должны использоваться только в синхронизированных блоках или методах.
wait()
заставляет текущий поток ожидать до тех пор, пока другой поток не вызоветnotify()
илиnotifyAll()
на том же объекте.notify()
пробуждает один случайно выбранный поток, который ожидает на этом объекте.notifyAll()
пробуждает все потоки, которые ожидают на этом объекте.
Рассмотрим классический пример производитель-потребитель:
public class Buffer {
private int contents;
private boolean available = false;
public synchronized void put(int value) {
while (available) {
try {
wait();
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
contents = value;
available = true;
notifyAll();
}
public synchronized int get() {
while (!available) {
try {
wait();
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
available = false;
notifyAll();
return contents;
}
}
ЗдесьBuffer
класс хранит одно целочисленное значение. Метод put()
ожидает, пока Buffer
не будет доступен для записи, и затем добавляет значение. Метод get()
ожидает, пока Buffer
не будет доступен для чтения, затем возвращает значение и освобождает буфер. wait()
вызывается, когда поток должен ждать доступности буфера, а notifyAll()
пробуждает все потоки после изменения состояния буфера.
InterruptedException
— это исключение, которое выбрасывается, когда другой поток прерывает текущий поток, пока тот находится в состоянии ожидания. Правильный способ обработки этого исключения — это выставить флаг прерывания обратно и завершить выполнение, если это уместно:
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // восстановить статус прерывания
return; // и выйти, если поток не может продолжить выполнение
}
Пару советов:
Минимизируйте время блокировки: используйте синхронизированные блоки как можно более короткими.
Избегайте вложенных блокировок: это может привести к взаимоблокировкам.
Используйте отдельные объекты блокировки: для управления доступом к различным частям данных.
Будьте внимательны с условиями ожидания: всегда используйте циклы
while
, а не условияif
, чтобы проверять условие ожидания, потому что поток может проснуться без изменений в условиях.
Присоединение и блокировка потоков
Метод join()
позволяет одному потоку ожидать завершения работы другого. Мастхев, когда нужно, чтобы данные, обрабатываемые в одном потоке, были полностью готовы перед выполнением каких-то действий в другом.
Пример использования join()
:
public class ThreadJoinExample {
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
System.out.println("Thread 1 running");
try {
Thread.sleep(1000); // Подождем 1 секунду
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread 1 finished");
});
Thread thread2 = new Thread(() -> {
System.out.println("Thread 2 running");
try {
thread1.join(); // Ожидаем завершения thread1
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread 2 finished");
});
thread1.start();
thread2.start();
}
}
Здесь thread2
ждет, пока thread1
не завершит свою работу, благодаря вызову join()
. Таким образом, сообщение «Thread 2 finished» всегда будет выводиться после »Thread 1 finished».
Тайм-ауты хорошидля предотвращения того, чтобы поток оставался заблокированным вечно, если ожидаемое событие никогда не произойдет. Это важный элемент управления исполнением, который помогает избежать фризов программы.
Пример с тайм-аутом в join()
:
Thread thread3 = new Thread(() -> {
try {
thread1.join(500); // Ожидаем максимум 500 мс
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread 3 may or may not have waited for Thread 1 to finish");
});
thread3.start();
thread3
ждет завершения thread1
не более 500 миллисекунд. Если thread1
не завершится за это время, thread3
продолжит выполнение.
Метод sleep()
применяется для приостановки работы текущего потока на заданный период времени. Он используется для имитации длительных операций, управления частотой выполнения и создания задержек.
Пример использования sleep()
:
Thread sleeperThread = new Thread(() -> {
try {
System.out.println("Sleeper thread going to sleep");
Thread.sleep(2000); // Спим 2 секунды
} catch (InterruptedException e) {
System.out.println("Sleeper thread was interrupted");
Thread.currentThread().interrupt();
}
System.out.println("Sleeper thread woke up");
});
sleeperThread.start();
Здесь поток sleeperThread
»спит» 2 секунды, что хорошо, например, для ограничения скорости выполнения цикла обработки данных.
Блокирующие объекты и конструкции синхронизации
Java имеет интерфейс Lock
с его замечательной реализацией ReentrantLock
. Эти механизмы предлагают большую гибкость по сравнению с традиционным подходом с использованием synchronized
. Они позволяют делать попытки захвата блокировок, тайм-ауты блокировок и многие другие операции, которые не поддерживаются synchronized
.
Пример использования ReentrantLock
:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
Здесь lock()
вызывается для захвата блокировки перед началом критической секции, и важно всегда вызывать unlock()
в блоке finally
, чтобы гарантировать освобождение блокировки даже в случае возникновения исключений.
Condition
увеличивает гибкость управления блокировками, позволяя одним потокам приостанавливать себя (ждать), пока другой поток не сообщит о каком-то условии. Это можно сравнить с расширенной версией Object.wait()
и Object.notify()
, но с возможностью создания множественных условий ожидания на одной блокировке.
Пример с Condition
:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items = new Object[100];
private int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
} finally {
lock.unlock();
}
return x;
}
}
Реализуем паттерн производитель-потребитель с блокирующей очередью:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerExample {
private static final BlockingQueue queue = new ArrayBlockingQueue<>(10);
static class Producer extends Thread {
public void run() {
try {
for (int i = 0; i < 20; i++) {
queue.put(i);
System.out.println("Produced " + i);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
static class Consumer extends Thread {
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println("Consumed " + item);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
}
В заключение приглашаем всех желающих на открытые уроки, посвященные Java-разработке:
7 августа: «Reflection API» — познакомимся с механизмом рефлексии в языке Java и посмотрим, где он применяется. Записаться
21 августа: «Обобщения в Java» — изучим, для чего они нужны; где они применяются в стандартной Java библиотеке;, а также как их можно использовать в своем коде. Записаться