10 способов реализовать потокозащищенный Stack на Java

В данной статье я предлагаю рассмотреть 10 способов реализовать потокозащищенный стек на Java.Почему стек? Потому что это одна из простейших в реализации структур данных, так что это не будет «затенять» многопоточную логику. Также из трех основных операций (push, pop, peek) — есть как операции исключительно чтения мутирующей совместно используемой памяти, так и операции записи.

Целью статьи не было проведение сравнительного анализа различных подходов. Задача статьи — показать разнообразие возможностей. Однако в целом стоит отметить, что основная проблема демонстрируемых реализаций стека — наличие одной «горячей точки».

Существуют реализации, которые ослабляют семантику FIFO (или, в других терминах, являются нелинериализуемыми) и «расщепляют» эту точку в «пятно», что улучшает показатели при высококонкурентном доступе. Возможно, это тема для еще одной статьи «Еще 10 способов …».

Это не просто статья, это — материал к весеннему вебинару «Multicore programming in Java». Видео к занятию #13 я выкладываю в свободный доступ для сообщества.

1. Не синхронизироваться, использовать чужой happens-before2. На основе synchronized3. На основе synchronized + идиома Private Monitor4. На основе ReentrantLock5. На основе Semaphore6. На основе ReentrantReadWriteLock7. На основе Spin Lock (неблокирующий)8. Treiber stack (неблокирующий)9. Используем идиому Copy-on-write10. В функциональном стиле: Persistent stack

Вот видео вебинара (Лекция #13), где мы разбираем данные 10 способов[embedded content]Везде ниже используется один и тот же вариант стека на односвязном списке. Методы pop () и peek () на пустом стеке приводят к NullPointerException.

public class Stack { private Node top = null; public void push (T newElem) { this.top = new Node<>(newElem, this.top); } public T peek () { return top.value; } public T pop () { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } 1. Не синхронизироваться, использовать чужой happens-before Если вы полностью контролируете использование вашего стека, то может оказаться, что1) его передача между потоками всегда сопряжена с happens-before ребром2) логика приложения такова, что потоки используют (читают/пишут) стек «по очереди«Собственно берем наш, ничем не защищенный стек

public class Stack { private Node top = null; public void push (T newElem) { this.top = new Node<>(newElem, this.top); } public T peek () { return top.value; } public T pop () { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } И передаем через happens-before ребро образованное вызовом метода start () и первой инструкцией метода run () (как говорит Святая Книга: «A call to start () on a thread happens-before any actions in the started thread.»)

public class Demo { public static void main (String[] args) { final Stack stack = new Stack<>(); // меняем стек в потоке main stack.push (1); stack.push (2); stack.push (3); stack.push (4); stack.push (5);

new Thread (new Runnable () { public void run () { // меняем стек в другом потоке stack.pop (); // и читаем, это все не проблема System.out.println (stack.pop ()); } }).start (); } } Стоит ли напоминать что за такое

public class Demo { public static void main (String[] args) { final Stack stack = new Stack<>(); stack.push (1); stack.push (2); stack.push (3); stack.push (4); stack.push (5);

new Thread (new Runnable () { public void run () { stack.push (100); } }).start (); new Thread (new Runnable () { public void run () { System.out.println (stack.peek ()); } }).start (); } } Вы будете вечно гореть в Java-Аду (за data-racefull программу)! Но практически все способы передачи через потокозащищенные коллекции создают happens-before ребро

import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;

public class Demo { public static void main (String[] args) { final Stack stack = new Stack<>(); final BlockingQueue> interThreadQueue = new LinkedBlockingQueue<>(); stack.push (1); stack.push (2); stack.push (3); stack.push (4); stack.push (5);

new Thread (new Runnable () { public void run () { stack.push (100); try { interThreadQueue.put (stack); } catch (InterruptedException ignore) {/*NOP*/} } }).start (); new Thread (new Runnable () { public void run () { try { Stack myStack = interThreadQueue.take (); System.out.println (myStack.pop ()); } catch (InterruptedException ignore) {/*NOP*/} } }).start (); } } Мораль: создавайте специальные очереди сообщений, для обмена непотокозащищенными данными между потоками.2. На основе synchronized Что может быть проще чем public class Stack { private Node top = null; public synchronized void push (T newElem) { this.top = new Node<>(newElem, this.top); } public synchronized T peek () { return top.value; } public synchronized T pop () { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } Из забавного — мы теперь сами стали той самой потокозащищенной коллекцией, передача данных через которую создает happens-before ребро!

3. На основе synchronized + идиома Private Monitor В предыдущем примере все хорошо, кроме того, что мы синхронизируемся по встроенному монитору стека, который «виден всем», кто видит стек. Таким образом, мы открываем наружу детали реализации синхронизации (нарушение инкапсуляции) и предыдущий пример можно атаковать вот так public class Demo { public static void main (String[] args) { final Stack stack = new Stack<>();

// Ну ооочень полезный поток, делает push ()/pop () new Thread (new Runnable () { public void run () { while (true) { stack.push («A»); stack.pop (); System.out.println («push ()/pop ()»); } } }).start ();

// поток-паразит new Thread (new Runnable () { public void run () { synchronized (stack) { while (true) ; } } }).start (); } }

>> push ()/pop () >> push ()/pop () >> push ()/pop () >> push ()/pop () >> … висим Поток-паразит использовал встроенный стек и «повис», повесив операции со стеком. Вряд ли он это сделал со злости, скорее ошибка программиста.Просто используйте идиому Private Mutex

public class Stack { // Private Mutex! private final Object lock = new Object (); private Node top = null; public void push (T newElem) { synchronized (lock) { this.top = new Node<>(newElem, this.top); } } public T peek () { synchronized (lock) { return this.top.value; } } public T pop () { synchronized (lock) { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } 4. На основе ReentrantLock Все как в предыдущем примере, но с ReentrantLock import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class Stack { private final Lock lock = new ReentrantLock (); private Node top = null; public void push (T newElem) { lock.lock (); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock ();} } public T peek () { lock.lock (); try { return top.value; } finally {lock.unlock ();} } public T pop () { lock.lock (); try { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {lock.unlock ();} }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } Вопрос, а зачем же использовать ReentrantLock, а не встроенный монитор/synchronized?

Ну, во-первых, обратите свой взор на его богатое API (честность/fairness, lock, lockInterruptibly, tryLock, …), хотя в данном случае, вряд ли какой-то другой поток надолго «зависнет» в методах стека

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class Stack { private final Lock lock; private Node top = null; public Stack (boolean fair) { this.lock = new ReentrantLock (fair); } // просто-push public void push (T newElem) { lock.lock (); try { //Thread.stop () this.top = new Node<>(newElem, this.top); } finally {lock.unlock ();} } // push, с возможностью прервать ожидание захвата блокировки // посредством Thread.interrupt () → InterruptedException public void pushInterruptibly (T newElem) throws InterruptedException { lock.lockInterruptibly (); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock ();} } // push, с возможностью захвата блокировки только // в том случае, если она свободна public boolean tryPush (T newElem) { if (lock.tryLock ()) { try { this.top = new Node<>(newElem, this.top); return true; } finally {lock.unlock ();} } else { return false; } } // push, с возможностью захвата блокировки // с ограниченным временем ожидания public boolean tryPush (T newElem, long time, TimeUnit unit) throws InterruptedException { if (lock.tryLock (time, unit)) { try { this.top = new Node<>(newElem, this.top); return true; } finally {lock.unlock ();} } else { return false; } }

/*В этом примере расписан только метод push (). peek () и pop () пропустили*/

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } А, во-вторых, обратите свой взор на главы »13.1. Lock and ReentrantLock»,»13.2. Performance Considerations» и »13.4. Choosing Between Synchronized and ReentrantLock» книги Brian Goetz и других «Java Concurrency in Practice», где проводится сравнение synchronized и ReentrantLock.

5. На основе Semaphore Аналогично предыдущему примеру (на ReentrantLock) можно сделать на семафоре import java.util.concurrent.Semaphore;

public class Stack { // binary semaphore private final Semaphore sem = new Semaphore (1); private Node top = null; public void push (T newElem) { sem.acquireUninterruptibly (); try { this.top = new Node<>(newElem, this.top); } finally {sem.release ();} } public T peek () { sem.acquireUninterruptibly (); try { return top.value; } finally {sem.release ();} } public T pop () { sem.acquireUninterruptibly (); try { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {sem.release ();} }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } Семафор, «заряженный единицей», работает как обычная блокировка и называется Binary Semaphore.Однако, как и другие жители java.util.concurrent и наследники Великого и Могучего AbstractQueuedSynchronizer обладает таким же богатым API import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;

public class Stack { // binary semaphore private final Semaphore sem; private Node top = null; public Stack (boolean fair) { this.sem = new Semaphore (1, fair); } public void push (T newElem) { sem.acquireUninterruptibly (); try { this.top = new Node<>(newElem, this.top); } finally {sem.release ();} } public void pushInterruptibly (T newElem) throws InterruptedException { sem.acquire (); try { this.top = new Node<>(newElem, this.top); } finally {sem.release ();} } public boolean tryPush (T newElem) { if (sem.tryAcquire ()) { try { this.top = new Node<>(newElem, this.top); return true; } finally {sem.release ();} } else { return false; } } public boolean tryPush (T newElem, long time, TimeUnit unit) throws InterruptedException { if (sem.tryAcquire (time, unit)) { try { this.top = new Node<>(newElem, this.top); return true; } finally {sem.release ();} } else { return false; } }

/*В этом примере расписан только метод push (). peek () и pop () пропустили*/

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } 6. На основе ReentrantReadWriteLock Давайте, наконец-то, обратим внимание на то, что у нас два рода операций — мутаторы (push, pop) и читатель (peek) и используем отдельные режимы блокировки — exclusive и shared import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Stack { private final ReadWriteLock rwLock = new ReentrantReadWriteLock (); private final Lock rLock = rwLock.readLock (); private final Lock wLock = rwLock.writeLock (); private Node top = null;

public void push (T newElem) { // wLock — EXCLUSIVE mode! wLock.lock (); try { this.top = new Node<>(newElem, this.top); } finally {wLock.unlock ();} } public T peek () { // rLock — SHARED mode! rLock.lock (); try { return this.top.value; } finally {rLock.unlock ();} } public T pop () { // wLock — EXCLUSIVE mode! wLock.lock (); try { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {wLock.unlock ();} }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } Да, ReentrantReadWriteLock тоже произошел от AbstractQueuedSynchronizer и тоже обладает всеми этими fairness, lock, lockInterruptible, tryLock, …7. На основе Spin Lock (неблокирующий) Мы можем сделать свой Spin Lock на основе java.util.concurrent.atomicТут мы, в случае занятого стека, не передаем управление JVM/OS (в отличии от senchronized, ReentrantLock, Semaphore, ReadWriteReentrantLock, …)

import java.util.concurrent.atomic.AtomicBoolean;

public class Stack { private final AtomicBoolean locked = new AtomicBoolean (false); private Node top = null; public void push (T newElem) { // false→true while (! locked.compareAndSet (false, true)) {/*NOP*/} try { this.top = new Node<>(newElem, this.top); } finally {locked.set (false);} } public T peek () { while (! locked.compareAndSet (false, true)) {/*NOP*/} try { return top.value; } finally {locked.set (false);} } public T pop () { while (! locked.compareAndSet (false, true)) {/*NOP*/} try { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally { locked.set (false); } } private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } Протокол захвата — перевод AtomicBoolean: false → true (при конкуренции со стороны других потоков).Протокол освобождения — перевод AtomicBoolean: true → false (без конкуренции со стороны других потоков).8. Treiber stack (неблокирующий) Ну раз уже взялись за на основе java.util.concurrent.atomic, то надо делать неблокирующий стек Трейбера import java.util.concurrent.atomic.AtomicReference;

public class Stack { private final AtomicReference> top = new AtomicReference<>(null);

public void push (T newElem) { Node newTop = new Node<>(newElem, null); while (true) { Node oldTop = top.get (); newTop.next = oldTop; if (top.compareAndSet (oldTop, newTop)) { break; } } } public T peek () { return top.get ().value; } public T pop () { while (true) { Node oldTop = this.top.get (); Node newTop = oldTop.next; if (top.compareAndSet (oldTop, newTop)) { return oldTop.value; } } }

private static class Node { private final E value; private Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } 9. Используем идиому Copy-on-write Это не совсем copy-on-write, мы не создаем полноценную копию, но это дань уважения старому доброму я взял эту идею у старого доброго CopyOnWriteArrayList — мутации с захватом монопольной блокировки, чтение через volatile import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;

public class Stack { private final Lock lock = new ReentrantLock (); private volatile Node top = null; public void push (T newElem) { lock.lock (); try { this.top = new Node<>(newElem, this.top); } finally {lock.unlock ();} } public T peek () { return top.value; } public T pop () { lock.lock (); try { Node oldTop = this.top; this.top = this.top.next; return oldTop.value; } finally {lock.unlock ();} }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } 10. В функциональном стиле: Persistent stack Функциональные языки от императивных отличает многое. Один из аспектов — нет изменяемых переменных. Точнее нет переменных вообще, есть значения. Вопрос —, а что же они делают с коллекциями? Как известно в отсутствии коллекций жизнь зародиться не может, а коллекции, кажется, по определению изменчивые сущности.Хитрые функциональщики на каждое действие-мутацию (add, remove, …) порождают новую коллекцию. А для уменьшения потребления памяти и процессора на такие действия — стараются, что бы новая версия использовала как можно больше «материала» от предыдущей. public class Stack { private final Node top; private Stack (Node top) { this.top = top; } public Stack () { this.top = null; } public Stack push (T newElem) { return new Stack<>(new Node<>(newElem, this.top)); } public T peek () { return this.top.value; } public Stack pop () { return new Stack<>(top.next); }

private static class Node { private final E value; private final Node next; private Node (E value, Node next) { this.value = value; this.next = next; } } } У такого варианта немного необычное API на первый взгляд. Его надо привыкнуть использовать определенным образом

public class Demo { public static void main (String[] args) { Stack stack = new Stack<>();

stack = stack.push («A»); stack = stack.push («B»).push («C»);

System.out.println («stack.peek ():» + stack.peek ()); } }

>> stack.peek (): C Так как каждая мутация порождает новую версию

public class Demo { public static void main (String[] args) { Stack stack = new Stack<>(); Stack stackA = stack.push («A»); Stack stackAB = stackA.push («B»); Stack stackABC = stackAB.push («C»);

System.out.println («stackA.peek ():» + stackA.peek ()); System.out.println («stackAB.peek ():» + stackAB.peek ()); System.out.println («stackABC.peek ():» + stackABC.peek ()); } }

>> stackA.peek (): A >> stackAB.peek (): B >> stackABC.peek (): C Здесь я непрерывно передаю «мутирующие» стеки от одного потока к другому. Конечно, надо делать safe publication (я делаю через volatile-read/volatile-write), но после чтения можно свободно «менять» свою версию.

public class Demo { static volatile Stack stack = new Stack<>(); static { stack.push (»#»); stack.push (»#»); }

public static void main (String[] args) { new Thread (new Runnable () { public void run () { for (int k = 0; ; k++) { // добавляем в стек элемент stack = stack.push (» + k); } } }).start ();

new Thread (new Runnable () { public void run () { while (true) { // удаляем из стека элемент Stack newStack = stack.pop (); System.out.println (newStack.peek ()); } } }).start (); } } Данный подход (метод-мутатор возвращает новую версию и не меняет предыдущую) достаточно хорошо представлен в JDK

public class Demo { public static void main (String[] args) { String origin = «Hello!»; String mutated = origin.toUpperCase ();

System.out.println («origin:» + origin); System.out.println («mutated:» + mutated); } }

>> origin: Hello! >> mutated: HELLO! import java.math.BigInteger;

public class Demo { public static void main (String[] args) { BigInteger origin = new BigInteger (»40»); BigInteger mutated = origin.add (new BigInteger (»2»));

System.out.println («origin:» + origin); System.out.println («mutated:» + mutated); } }

>> origin: 40 >> mutated: 42 Контакты Кратко о курсе «Multicore programming in Java»: стартует 1 сентября, ведется в режиме вебинаров дважды в неделю (понедельник + четверг) в 19.00–22.00 (по московскому времени), состоит из 16 лекций по 2.5 часа (=40 лекционных часов), рассчитан на Java Middle.Стоимость курса— при оплате до 9 августа — 375$— при оплате до 16 августа — 400$— при оплате до 23 августа — 425$— при оплате до 30 августа — 450$

Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.

На все вопросы с удовольствием отвечу по следующим контактам (или в комментариях)skype: GolovachCoursesemail: GolovachCourses@gmail.com

© Habrahabr.ru