10 способов реализовать потокозащищенный Stack на Java01.08.2014 18:48
В данной статье я предлагаю рассмотреть 10 способов реализовать потокозащищенный стек на Java.Почему стек? Потому что это одна из простейших в реализации структур данных, так что это не будет «затенять» многопоточную логику. Также из трех основных операций (push, pop, peek) — есть как операции исключительно чтения мутирующей совместно используемой памяти, так и операции записи.
Целью статьи не было проведение сравнительного анализа различных подходов. Задача статьи — показать разнообразие возможностей. Однако в целом стоит отметить, что основная проблема демонстрируемых реализаций стека — наличие одной «горячей точки».
Существуют реализации, которые ослабляют семантику FIFO (или, в других терминах, являются нелинериализуемыми) и «расщепляют» эту точку в «пятно», что улучшает показатели при высококонкурентном доступе. Возможно, это тема для еще одной статьи «Еще 10 способов …».
Это не просто статья, это — материал к весеннему вебинару «Multicore programming in Java». Видео к занятию #13 я выкладываю в свободный доступ для сообщества.
1. Не синхронизироваться, использовать чужой happens-before2. На основе synchronized3. На основе synchronized + идиома Private Monitor4. На основе ReentrantLock5. На основе Semaphore6. На основе ReentrantReadWriteLock7. На основе Spin Lock (неблокирующий)8. Treiber stack (неблокирующий)9. Используем идиому Copy-on-write10. В функциональном стиле: Persistent stack
Вот видео вебинара (Лекция #13), где мы разбираем данные 10 способов[embedded content]Везде ниже используется один и тот же вариант стека на односвязном списке. Методы pop () и peek () на пустом стеке приводят к NullPointerException.
public class Stack {
private Node top = null;
public void push (T newElem) {
this.top = new Node<>(newElem, this.top);
}
public T peek () {
return top.value;
}
public T pop () {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
1. Не синхронизироваться, использовать чужой happens-before
Если вы полностью контролируете использование вашего стека, то может оказаться, что1) его передача между потоками всегда сопряжена с happens-before ребром2) логика приложения такова, что потоки используют (читают/пишут) стек «по очереди«Собственно берем наш, ничем не защищенный стек
public class Stack {
private Node top = null;
public void push (T newElem) {
this.top = new Node<>(newElem, this.top);
}
public T peek () {
return top.value;
}
public T pop () {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
И передаем через happens-before ребро образованное вызовом метода start () и первой инструкцией метода run () (как говорит Святая Книга: «A call to start () on a thread happens-before any actions in the started thread.»)
public class Demo {
public static void main (String[] args) {
final Stack stack = new Stack<>();
// меняем стек в потоке main
stack.push (1);
stack.push (2);
stack.push (3);
stack.push (4);
stack.push (5);
new Thread (new Runnable () {
public void run () {
// меняем стек в другом потоке
stack.pop ();
// и читаем, это все не проблема
System.out.println (stack.pop ());
}
}).start ();
}
}
Стоит ли напоминать что за такое
public class Demo {
public static void main (String[] args) {
final Stack stack = new Stack<>();
stack.push (1);
stack.push (2);
stack.push (3);
stack.push (4);
stack.push (5);
new Thread (new Runnable () {
public void run () {
stack.push (100);
}
}).start ();
new Thread (new Runnable () {
public void run () {
System.out.println (stack.peek ());
}
}).start ();
}
}
Вы будете вечно гореть в Java-Аду (за data-racefull программу)! Но практически все способы передачи через потокозащищенные коллекции создают happens-before ребро
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Demo {
public static void main (String[] args) {
final Stack stack = new Stack<>();
final BlockingQueue> interThreadQueue
= new LinkedBlockingQueue<>();
stack.push (1);
stack.push (2);
stack.push (3);
stack.push (4);
stack.push (5);
new Thread (new Runnable () {
public void run () {
stack.push (100);
try {
interThreadQueue.put (stack);
} catch (InterruptedException ignore) {/*NOP*/}
}
}).start ();
new Thread (new Runnable () {
public void run () {
try {
Stack myStack = interThreadQueue.take ();
System.out.println (myStack.pop ());
} catch (InterruptedException ignore) {/*NOP*/}
}
}).start ();
}
}
Мораль: создавайте специальные очереди сообщений, для обмена непотокозащищенными данными между потоками.2. На основе synchronized
Что может быть проще чем
public class Stack {
private Node top = null;
public synchronized void push (T newElem) {
this.top = new Node<>(newElem, this.top);
}
public synchronized T peek () {
return top.value;
}
public synchronized T pop () {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
Из забавного — мы теперь сами стали той самой потокозащищенной коллекцией, передача данных через которую создает happens-before ребро!
3. На основе synchronized + идиома Private Monitor
В предыдущем примере все хорошо, кроме того, что мы синхронизируемся по встроенному монитору стека, который «виден всем», кто видит стек. Таким образом, мы открываем наружу детали реализации синхронизации (нарушение инкапсуляции) и предыдущий пример можно атаковать вот так
public class Demo {
public static void main (String[] args) {
final Stack stack = new Stack<>();
// Ну ооочень полезный поток, делает push ()/pop ()
new Thread (new Runnable () {
public void run () {
while (true) {
stack.push («A»);
stack.pop ();
System.out.println («push ()/pop ()»);
}
}
}).start ();
// поток-паразит
new Thread (new Runnable () {
public void run () {
synchronized (stack) {
while (true) ;
}
}
}).start ();
}
}
>> push ()/pop ()
>> push ()/pop ()
>> push ()/pop ()
>> push ()/pop ()
>> … висим
Поток-паразит использовал встроенный стек и «повис», повесив операции со стеком. Вряд ли он это сделал со злости, скорее ошибка программиста.Просто используйте идиому Private Mutex
public class Stack {
// Private Mutex!
private final Object lock = new Object ();
private Node top = null;
public void push (T newElem) {
synchronized (lock) {
this.top = new Node<>(newElem, this.top);
}
}
public T peek () {
synchronized (lock) {
return this.top.value;
}
}
public T pop () {
synchronized (lock) {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
4. На основе ReentrantLock
Все как в предыдущем примере, но с ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Stack {
private final Lock lock = new ReentrantLock ();
private Node top = null;
public void push (T newElem) {
lock.lock ();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock ();}
}
public T peek () {
lock.lock ();
try {
return top.value;
} finally {lock.unlock ();}
}
public T pop () {
lock.lock ();
try {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {lock.unlock ();}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
Вопрос, а зачем же использовать ReentrantLock, а не встроенный монитор/synchronized?
Ну, во-первых, обратите свой взор на его богатое API (честность/fairness, lock, lockInterruptibly, tryLock, …), хотя в данном случае, вряд ли какой-то другой поток надолго «зависнет» в методах стека
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Stack {
private final Lock lock;
private Node top = null;
public Stack (boolean fair) {
this.lock = new ReentrantLock (fair);
}
// просто-push
public void push (T newElem) {
lock.lock ();
try { //Thread.stop ()
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock ();}
}
// push, с возможностью прервать ожидание захвата блокировки
// посредством Thread.interrupt () → InterruptedException
public void pushInterruptibly (T newElem) throws InterruptedException {
lock.lockInterruptibly ();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock ();}
}
// push, с возможностью захвата блокировки только
// в том случае, если она свободна
public boolean tryPush (T newElem) {
if (lock.tryLock ()) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {lock.unlock ();}
} else {
return false;
}
}
// push, с возможностью захвата блокировки
// с ограниченным временем ожидания
public boolean tryPush (T newElem, long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock (time, unit)) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {lock.unlock ();}
} else {
return false;
}
}
/*В этом примере расписан только метод push ().
peek () и pop () пропустили*/
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
А, во-вторых, обратите свой взор на главы »13.1. Lock and ReentrantLock»,»13.2. Performance Considerations» и »13.4. Choosing Between Synchronized and ReentrantLock» книги Brian Goetz и других «Java Concurrency in Practice», где проводится сравнение synchronized и ReentrantLock.
5. На основе Semaphore
Аналогично предыдущему примеру (на ReentrantLock) можно сделать на семафоре
import java.util.concurrent.Semaphore;
public class Stack {
// binary semaphore
private final Semaphore sem = new Semaphore (1);
private Node top = null;
public void push (T newElem) {
sem.acquireUninterruptibly ();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release ();}
}
public T peek () {
sem.acquireUninterruptibly ();
try {
return top.value;
} finally {sem.release ();}
}
public T pop () {
sem.acquireUninterruptibly ();
try {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {sem.release ();}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
Семафор, «заряженный единицей», работает как обычная блокировка и называется Binary Semaphore.Однако, как и другие жители java.util.concurrent и наследники Великого и Могучего AbstractQueuedSynchronizer обладает таким же богатым API
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Stack {
// binary semaphore
private final Semaphore sem;
private Node top = null;
public Stack (boolean fair) {
this.sem = new Semaphore (1, fair);
}
public void push (T newElem) {
sem.acquireUninterruptibly ();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release ();}
}
public void pushInterruptibly (T newElem) throws InterruptedException {
sem.acquire ();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release ();}
}
public boolean tryPush (T newElem) {
if (sem.tryAcquire ()) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {sem.release ();}
} else {
return false;
}
}
public boolean tryPush (T newElem, long time, TimeUnit unit) throws InterruptedException {
if (sem.tryAcquire (time, unit)) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {sem.release ();}
} else {
return false;
}
}
/*В этом примере расписан только метод push ().
peek () и pop () пропустили*/
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
6. На основе ReentrantReadWriteLock
Давайте, наконец-то, обратим внимание на то, что у нас два рода операций — мутаторы (push, pop) и читатель (peek) и используем отдельные режимы блокировки — exclusive и shared
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Stack {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock ();
private final Lock rLock = rwLock.readLock ();
private final Lock wLock = rwLock.writeLock ();
private Node top = null;
public void push (T newElem) {
// wLock — EXCLUSIVE mode!
wLock.lock ();
try {
this.top = new Node<>(newElem, this.top);
} finally {wLock.unlock ();}
}
public T peek () {
// rLock — SHARED mode!
rLock.lock ();
try {
return this.top.value;
} finally {rLock.unlock ();}
}
public T pop () {
// wLock — EXCLUSIVE mode!
wLock.lock ();
try {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {wLock.unlock ();}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
Да, ReentrantReadWriteLock тоже произошел от AbstractQueuedSynchronizer и тоже обладает всеми этими fairness, lock, lockInterruptible, tryLock, …7. На основе Spin Lock (неблокирующий)
Мы можем сделать свой Spin Lock на основе java.util.concurrent.atomicТут мы, в случае занятого стека, не передаем управление JVM/OS (в отличии от senchronized, ReentrantLock, Semaphore, ReadWriteReentrantLock, …)
import java.util.concurrent.atomic.AtomicBoolean;
public class Stack {
private final AtomicBoolean locked = new AtomicBoolean (false);
private Node top = null;
public void push (T newElem) {
// false→true
while (! locked.compareAndSet (false, true)) {/*NOP*/}
try {
this.top = new Node<>(newElem, this.top);
} finally {locked.set (false);}
}
public T peek () {
while (! locked.compareAndSet (false, true)) {/*NOP*/}
try {
return top.value;
} finally {locked.set (false);}
}
public T pop () {
while (! locked.compareAndSet (false, true)) {/*NOP*/}
try {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {
locked.set (false);
}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
Протокол захвата — перевод AtomicBoolean: false → true (при конкуренции со стороны других потоков).Протокол освобождения — перевод AtomicBoolean: true → false (без конкуренции со стороны других потоков).8. Treiber stack (неблокирующий)
Ну раз уже взялись за на основе java.util.concurrent.atomic, то надо делать неблокирующий стек Трейбера
import java.util.concurrent.atomic.AtomicReference;
public class Stack {
private final AtomicReference> top = new AtomicReference<>(null);
public void push (T newElem) {
Node newTop = new Node<>(newElem, null);
while (true) {
Node oldTop = top.get ();
newTop.next = oldTop;
if (top.compareAndSet (oldTop, newTop)) {
break;
}
}
}
public T peek () {
return top.get ().value;
}
public T pop () {
while (true) {
Node oldTop = this.top.get ();
Node newTop = oldTop.next;
if (top.compareAndSet (oldTop, newTop)) {
return oldTop.value;
}
}
}
private static class Node {
private final E value;
private Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
9. Используем идиому Copy-on-write
Это не совсем copy-on-write, мы не создаем полноценную копию, но это дань уважения старому доброму я взял эту идею у старого доброго CopyOnWriteArrayList — мутации с захватом монопольной блокировки, чтение через volatile
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Stack {
private final Lock lock = new ReentrantLock ();
private volatile Node top = null;
public void push (T newElem) {
lock.lock ();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock ();}
}
public T peek () {
return top.value;
}
public T pop () {
lock.lock ();
try {
Node oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {lock.unlock ();}
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
10. В функциональном стиле: Persistent stack
Функциональные языки от императивных отличает многое. Один из аспектов — нет изменяемых переменных. Точнее нет переменных вообще, есть значения. Вопрос —, а что же они делают с коллекциями? Как известно в отсутствии коллекций жизнь зародиться не может, а коллекции, кажется, по определению изменчивые сущности.Хитрые функциональщики на каждое действие-мутацию (add, remove, …) порождают новую коллекцию. А для уменьшения потребления памяти и процессора на такие действия — стараются, что бы новая версия использовала как можно больше «материала» от предыдущей.
public class Stack {
private final Node top;
private Stack (Node top) {
this.top = top;
}
public Stack () {
this.top = null;
}
public Stack push (T newElem) {
return new Stack<>(new Node<>(newElem, this.top));
}
public T peek () {
return this.top.value;
}
public Stack pop () {
return new Stack<>(top.next);
}
private static class Node {
private final E value;
private final Node next;
private Node (E value, Node next) {
this.value = value;
this.next = next;
}
}
}
У такого варианта немного необычное API на первый взгляд. Его надо привыкнуть использовать определенным образом
public class Demo {
public static void main (String[] args) {
Stack stack = new Stack<>();
stack = stack.push («A»);
stack = stack.push («B»).push («C»);
System.out.println («stack.peek ():» + stack.peek ());
}
}
>> stack.peek (): C
Так как каждая мутация порождает новую версию
public class Demo {
public static void main (String[] args) {
Stack stack = new Stack<>();
Stack stackA = stack.push («A»);
Stack stackAB = stackA.push («B»);
Stack stackABC = stackAB.push («C»);
System.out.println («stackA.peek ():» + stackA.peek ());
System.out.println («stackAB.peek ():» + stackAB.peek ());
System.out.println («stackABC.peek ():» + stackABC.peek ());
}
}
>> stackA.peek (): A
>> stackAB.peek (): B
>> stackABC.peek (): C
Здесь я непрерывно передаю «мутирующие» стеки от одного потока к другому. Конечно, надо делать safe publication (я делаю через volatile-read/volatile-write), но после чтения можно свободно «менять» свою версию.
public class Demo {
static volatile Stack stack = new Stack<>();
static {
stack.push (»#»);
stack.push (»#»);
}
public static void main (String[] args) {
new Thread (new Runnable () {
public void run () {
for (int k = 0; ; k++) {
// добавляем в стек элемент
stack = stack.push (» + k);
}
}
}).start ();
new Thread (new Runnable () {
public void run () {
while (true) {
// удаляем из стека элемент
Stack newStack = stack.pop ();
System.out.println (newStack.peek ());
}
}
}).start ();
}
}
Данный подход (метод-мутатор возвращает новую версию и не меняет предыдущую) достаточно хорошо представлен в JDK
public class Demo {
public static void main (String[] args) {
String origin = «Hello!»;
String mutated = origin.toUpperCase ();
System.out.println («origin:» + origin);
System.out.println («mutated:» + mutated);
}
}
>> origin: Hello!
>> mutated: HELLO!
import java.math.BigInteger;
public class Demo {
public static void main (String[] args) {
BigInteger origin = new BigInteger (»40»);
BigInteger mutated = origin.add (new BigInteger (»2»));
System.out.println («origin:» + origin);
System.out.println («mutated:» + mutated);
}
}
>> origin: 40
>> mutated: 42
Контакты
Кратко о курсе «Multicore programming in Java»: стартует 1 сентября, ведется в режиме вебинаров дважды в неделю (понедельник + четверг) в 19.00–22.00 (по московскому времени), состоит из 16 лекций по 2.5 часа (=40 лекционных часов), рассчитан на Java Middle.Стоимость курса— при оплате до 9 августа — 375$— при оплате до 16 августа — 400$— при оплате до 23 августа — 425$— при оплате до 30 августа — 450$
Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.
На все вопросы с удовольствием отвечу по следующим контактам (или в комментариях)skype: GolovachCoursesemail: GolovachCourses@gmail.com
© Habrahabr.ru