[ home | doges | learn | cates | random | about ]
  • Java Concurrency 4 — Аноним Apr 20, 2020 №4 Развернуть пост | Фокус на посте

    Оглавление

    • Deadlocks
      • Опять пример с банковскими счетами
      • Deadlock
      • Deadlock handling
      • Critical Sections

    Deadlocks

    Опять пример с банковскими счетами

    Можно рассмотреть тот пример с банковскими счетами, которые рандомно пересылают деньги друг другу и ждут, пока им кто-нибудь не докинет денег, если на счету не хватает денег, чтобы провести транзакцию. Там на каждый банковский счёт есть ровно один поток. Если у всех по умолчанию, скажем, 1000 денег, и максимальная величина транзакции — тоже 1000, то ничего не сможет дедлокнуться, потому что в любой момент времени хотя бы у кого-нибудь будет достаточно денег, чтобы перечислить их кому-нибудь другому.

    Но, если увеличить максимальную величину транзакции, то уже могут возникуть ситуации, при которых никто не сможет никому перечислить деньги. Например:

    • У первого потока 300 денег, и он пытается перечислить 400 другому аккаунту
    • У второго потока 200 денег и он пытается перечислить 300 другому

    И никто ничего не может сделать.


    Ещё дедлок тут может случиться из-за того, что на condition’е вызывается signal(), а не signalAll():

    1. У первого потока 1500 денег, у остальных — 900. Все вот эти остальные потоки пытаются кому-то переслать 1000 денег, так что они находятся в состоянии ожидания.
    2. Первый поток пересылает второму 1000 денег, но уведомляет только третьего об этом, потому что signal() выбирает рандомно только один спящий поток. Второй поток получает деньги, но всё ещё спит, третий просыпается, но сразу же засыпает, потому что у него нет денег.
    3. После этого первый поток пытается перевести кому-то ещё 1000, но у него на счету всего 500, так что он засыпает. Всё, теперь все спят.

    Кстати, вроде как можно посмотреть дамп тредов, если нажать Ctrl + \ при выполнении прог раммы, но у меня почему-то там не отобразился ни один из созданных мною потоков. Но получилось посмотреть на это через jconsole.



    Deadlock

    Вообще в общем случае дедлок возникает, когда каждый поток ждёт выполнения какого-то условия, и в итоге никто не может продолжить своё выполнение.

    • Mutual exclusion. Есть какой-то ресурс, которым может одновременно пользоваться ровно один поток.
    • Hold and wait. Каждый поток удерживает по крайней мере один ресурс и хочет получить доступ к другому ресурсу, который удерживается другим потоком.
    • No preemption. Поток отпускает ресурс только по своей воле.
    • Circular wait. Поток P1 удерживает ресурс R1, хочет получить доступ к ресурсу R2, который удерживает поток P2. P2 хочет ресурс R3, который удерживается потоком P3 и так далее, пока оно не замыкается на первом потоке опять.


    Deadlock handling

    Ignoring deadlock. Можно игнорировать возможное состояние дедлока, исходя из предположения о том, что дедлок либо невозможен, либо вероятность его возникновения крайне мала и в целом расходы на то, чтобы избежать это потенциальное плохое состояние нерелевантны, учитывая вероятность его возникновения. В общем случае это называется Ostrich algorithm — a strategy of ignoring potencial problems on the basis that they may be exceedingly rare (ostrich effect — to stick one’s head in the sand and pretend there is no problem).

    Deadlock detection and correction. Другой способ — разрешить системе дедлокнуться, выявить состояние дедлока и разрешить его (намеренно нарушив одно из условий дедлока, обычно то, которое про circluar wait, то есть разбить цикл).

    • Можно прервать все процессы, которые вместе находятся в состоянии дедлока. Это гарантированно его уберёт, но при этом можно потерять результаты вычислений. Другой способ — последовательно прерывать процессы (в каком-то особом порядке, может, там эвристика какая-то, или просто выбирается в соответствии с приоритетами и временем выполнения), пока дедлок не пропадёт. Это, очевидно, более долгий способ, потому что нужно итеративно убирать процесс — проверять условия дедлока, но зато итоге можно будет обойтись меньшими потерями. Как я понимаю, это не самый предпочтительный метод, потому что впустую тратится ЦПУ-время.

    • Или можно отобрать ресурсы от одних потоков и дать их другим (resource preemption), делать так, пока дедлок не разрешится. В идеале хорошо было бы вернуть при этом соответствующий поток в последнее безопасное состояние и запустить его заново, когда дедлок разрешится, однако часто это невозможно, так что он просто запускается заново. Тут тоже есть проблема с тем, как выбирать victim‘ов для resource preemption.

      Ещё одна проблема с этим всем — это голодование процесса (starvation). Это просто, когда процессу постоянно не дают ресурсов (не важно, ЦПУ-время, или что-то ещё). Тут это может произойти из-за того, что в качестве жертвы постоянно будет выбираться один и тот же поток.

    …

    потом полистать, там они, наверное, пересекаются, но можно будет достать бол ьше ссылок

    • deadlock
    • locks disadvantages
    • non-blocking algorithm
    • spooling
    • deadlock prevention algorithms
    • lock
    • concurrency control
    • mutual exclusion
    • critical section
    • Peterson’s algorithm
    • semaphores


    Critical Sections

    Critical section — это кусок программы, который защищён от того, чтобы он выполнялся одновременно более, чем одним потоком, для того, чтобы избежать race condition. Race condition — это состояние системы, в котором на её поведение очень сильно влияют тайминги или последовательность выполнения процессов/потоков.

    Частный случай — data race (когда два потока одновременно что-то делают с одним местом в памяти таким образом, что это может привести к неопределённым результатам и к повреждению памяти из-за не-атомарности операций). Обычно, помимо обычных, небезопасных операций работы с данными, имплементированы синхронизированные или atomic операции, которые не могут быть прерваны во время своего выполнения. При этом в терминах программы, которая использует атомик операции всё ещё может иметь место race condition, но с использованием атомиков по крайней мере не будет повреждений памяти, но всё будет неопределённым.


    Критическую секцию можно реализовать, например, с помощью алгоритма Петерсона. В оригинале он был сделан только для двух потоков, но он расширяется до произвольного количества потоков. Потоки коммуницируют друг с другом через один общий интеджер (int turn) и массив из boolean значений по одному на каждый поток (bool[] flag). Это может работать корректно только при условии того, что значения этих переменных и ячеек массива видны одинаково из всех потоков. В джаве достаточно объявить их как volatile (вернее нет, для массива как раз недостаточно, нужно использовать, например AtomicIntegerArray).


    Volatile массивы в джавве. А ещё можно после каждой записи в массив делать arr = arr, тогда вроде как это будет массив из volatile значений, хоть он и будет неэффективным (на каждую запись (по смыслу) тратится две записи по сути):

    volatile int[] arr = new int[...];
    ...
    arr[4] = 100;
    arr = arr;
    ...
    

    Но, начиная с Java 9, можно использовать VarHandle, который умеет ссылаться на что угодно и для которого можно явно вызвать get/setVolatile при необходимости. Этот же способ используется при реализации AtomicIntegerArray, кстати:

    public class AtomicIntegerArray {
        private static final VarHandle AA = MethodHandles.arrayElementVarHandle(int[].class);
        private final int[] array;
        
        public final int get(int i) {
            return (int) AA.getVolatile(array, i);
        }
        
        public final void set(int i, int newValue) {
            AA.setVolatile(array, i, newValue);
        }
    }
    

  • Java Concurrency 3 — Аноним Apr 17, 2020 №3 Развернуть пост | Фокус на посте

    Оглавление

    • Cache Coherence
    • Memory Barrier
    • Java Memory Model (JMM)
      • Основные штуки
      • Операции, связанные отношением happens-before
      • Memory Barriers in Java
    • Thread Signaling
      • Spurious Wakeups

    Cache Coherence

    Cache coherence — это uniformity (целостность, не мог вспомнить слово на русском) данных, хранящихся в локальных кэшах для разделяемого ресурса. Проблемы с когерентностью возникают, когда один ресурс кэшируется в нескольких местах, в частности это происходит в multiprocessing системах: например, оба потока читают число 0 из одной и той же ячейки памяти и сохраняют её себе в кэш. После чего они по очереди добавляют к своему значению единицу. При отсутствии когерентности памяти каждый в итоге запишет единицу в main memory.

    Когерентность кеша требует чтобы:

    • если какой-то поток пишет X в память, а потом сразу же оттуда читает, то результатом чтения должно быть X при условии, что никакой другой поток не писал ничего в эту ячейку памяти
    • если один поток пишет X в память, а потом, после достаточно большого промежутка времени, другой поток читает из этой же ячейки памяти, то он должен прочитать этот X при условии, что в промежутке между записью и чтением никто другой ничего не писал в эту ячейку. Эти два первые условия вместе составляют условие для write propagation.
    • writes serialization — writes to the same location are serialized: two writes to the same location by any two threads are seen in the same order by all processors. То есть, если, например, если сначала поток P1 пишет в ячейку памяти 0, а потом P2 пишет туда 1, то никакой из остальных потоков не должен увидить 0 после 1, только сначала 0, а потом 1.

    Короче, это всё гарантирует то, что всё будет работать так, как мы интуитивно хотим, то есть, чтобы операции чтения/записи потоков были максимально гранулированными и после записи в кэш сразу же производилась запись в main memory. Но на практике это всё реализуется как-то по другому, более по-умному.

    img



    Memory Barrier

    Memory barrier — это специальная инструкция, которая заставляет CPU или компилятор делать так, чтобы все обращения к памяти до барьера были завершены к началу выполнения инструкций (но там вроде есть разные виды барьеров, которые разделяют записи и чтения). Это нужно при реализации многопоточных штук, потому что процессор может переставлять инструкции так, чтобы в рамках одного потока проблем не возникало, но при этом в случае с двумя может случиться вот такое:

    # первый поток
    while (f == 0);
    # здесь нужен memory barrier, потому что иначе print может быть переставлен с циклом
    # ожидания и тогда выведется ещё потенциально не заданный икс
    print x;
    
    # второй поток
    x = 42;
    # здесь снова нужен барьер, потому что иначе флаг f может быть выставлен до того, как будет
    # присвоено значение иксу
    f = 1;
    


    Java Memory Model (JMM)

    (ссылка на статью на рбрыварабаре)

    Модель памяти Java (Java Memory Model, JMM) определяет то, каким образом потоки взаимодейтсвуют друг с другом и как используют разделяемые данные. Тут же определяются всякие оптимизации, которые может выполнить компилятор.

    Основные штуки

    • Atomicity. Ну, это уже упоминалось: операции записи могут быть неатомарными на некоторых платформах, так что могут быть прерваны в процессе.

    • Visibility. Как я понял, раньше у каждого потока был свой явный кэш, который в частности использовался как буфер при операциях чтения/записи, и при каких-то условиях этот кэш мог синхронизироваться с main memory (cache coherence), но в новой JMM, видимо, от этого всего остались только сами условия, при которых один поток видит изменения, выполненные другим потоком, а то, как оно реализовано внутри — не имеет значения.

    • Reordering. Перестановка инструкций для оптимизации, с одним потоком незаметно, но из других потоков инструкции видны в другом порядке, не в том, в котором ты их написал. (Ну, в качестве примера — тот кусок кода в memory barrier’ах.)

    • Happens-before отношение. Это то, с помощью чего задаются правила, по которому компилятору запрещено выполнять reordering. Если есть поток X и поток Y (может быть, что X = Y), в потоке X выполняется операция A, в потоке Y — операция B. A happens-before B означает, что во время выполнения B и после её выполнения из потока Y будут видны изменения, которые выполнились потоком X до операции A и то, что повлекла операция A.


      img


      Здесь на схеме зелёное в потоке X — то, что видно из зелёной части в потоке Y. Такое отношение будет также транзитивным (ну, это очевидно просто интуитивно).



    Операции, связанные отношением happens-before

    • Releasing монитора happens-before acquiring этого же самого монитора. По сути это интуитивная логика, по которой работают lock’и: второй поток может получить lock только после того, как другой поток его отпустит. При не надо беспокоиться по поводу конструкторов монитора: There is no practical need for a constructor to be synchronized, because it would lock the object under construction, which is normally not made available to other threads until all constructors for the object have completed their work.

      Тем не менее, вроде как всё равно можно получить ссылку на объект с ещё не завершённым конструктором из другого потока, но это что-то экзотическое (внутри самого конструктора создаётся поток, который использует this), и вроде как решается с помощью простого synchronized(this) (сылсылкла).

    • Запись в volatile переменную happens-before чтение из той же переменной. То есть чтение из volatile переменной будет всегда возвращать актуальное на текущий момент значение. Благодаря этому можно реализовать double-checked locking pattern (который, впрочем, считается анти-паттерном, потому что всё очень легко заруинить тут):

      public class Keeper {
          private volatile Data data = null;
            
          public Data getData() {
              // просто минус одно обращение к volatile переменной
              Data localData = data;
              // если volatile data != null, то это означает, что там актуальное значение,
              // которое можно вернуть
              if (localData == null)
                  synchronized (this) {
                  	// синхронизация нужна только в том случае, если нам необходимо создать
                  	// новый instance объекта Data, чтобы никто не мог в это время что-то
                  	// сделать c data, в частности, никто не мог вызвать getData из двух
                  	// потоков одновременно
                      if (data == null) data = new Data();
                  }
              return localData;
          }
      }
      

      Смысл в том, что volatile переменные работают быстрее чем lock объекты, так что, если в них нет необходимости, можно использовать этот шорткат. Если бы не volatile, то не было бы гарантий, что в data содержится актуальное значение при проверке data == null, так что можно случайно два раза инициализировать объект Data. Раньше (судя по всему) в старых версиях JMM была ещё проблема в том, что через переменную data в этом случае из другого потока можно было получить доступ к объекту, у которого ещё не завершён конструктор, но сейчас вроде как такого нет.

    • Запись в final поле при конструирование объекта его содержащего происходит до того, как кто-либо получит доступ к этому полю вне конструктора. Кроме того, если какие-то другие объекты достижимы из этого final поля, то они обладают тем же свойством.



    Memory Barriers in Java

    ссылка на штуки

    На самом деле, существует несколько вариантов барьеров. %Первое слово в названии% — действие, эффект которого будет виден прежде, чем будет виден эффект типа %второе слово в названии%:

    • LoadLoad — прежде, чем кто-либо после барьера попытается прочитать данные, все запросы о чтении данных до барьера будут завершены. Остальные виды барьера определяются аналогично.
    • StoreStore, StoreLoad, LoadStore

    Да и тут немного важно то, что JMM не гарантирует того, в какой момент будет запись в main memory, например. Memory barriers are only indirectly related to higher-level notions described in memory models such as “acquire” and “release”. And memory barriers are not themselves “synchronization barriers”. And memory barriers are unrelated to the kinds of “write barriers” used in some garbage collectors. Memory barrier instructions directly control only the interaction of a CPU with its cache, with its write-buffer that holds stores waiting to be flushed to memory, and/or its buffer of waiting loads or speculatively executed instructions. These effects may lead to further interaction among caches, main memory and other processors. But there is nothing in the JMM that mandates any particular form of communication across processors so long as stores eventually become globally performed; i.e., visible across all processors, and that loads retrieve them when they are visible.

      2nd op 2nd op 2nd op 2nd op
    1st op Normal Load Normal Store Volatile Load MonitorEnter Volatile Store MonitorExit
    Normal Load       LoadStore
    Normal Store       StoreStore
    Volatile Load MonitorEnter LoadLoad LoadStore LoadLoad LoadStore
    Volatile Store MonitorExit     StoreLoad StoreStore

    Из этого всего в частности следует, что:

    • Нельзя перенести никакие операции записи/чтения вышеvolatile чтения. Такой барьер ещё называется барьером с acquire semantics. Этой штукой можно поставить барьер, чтобы убедиться в том, что выше определённого момента никакие инструкции не перенесутся. То есть это ограничение сверху.
    • Нельзя перенести никакие операции записи/чтения ниже volatile записи. Такой барьер ещё называется барьером с release semantics. Этой штукой можно поставить барьер, чтобы убедиться в том, что после определённого момента все чтения/записи вступили в силу. То есть это ограничение снизу.


    Thread Signaling

    Один из простых вариантов взаимодействия между потоками — это использование общего объекта, в котором инкапсулирован флаг, доступ к которому осуществляется через synchronized методы:

    class Signal {
        private boolean flag = false;
        
        public synchronized boolean getFlag() {
            return flag;
        }
        
        public synchronized void setFlag(boolean flag) {
            this.flag = flag;
        }
    }
    

    И дальше один из потоков просто будет находиться в ожидании (busy wait), пока другой поток не выставить флаг:

    while (!sharedSignal.getFlag());
    

    С одной стороны, да, это максмально примитивный подход, но иногда он может быть лучше, чем использовать wait-signal, потому что в случае с последним происходит conext switch, а это довольно дорогая операция (и засыпание, и просыпывание (возобновление) потока): нужно сохранить/восстановить состояние процессора: какие-то там регистры, program counter и прочие штуки. Так что, если заранее известно, что ожидание долго не продлится, то имеет смысл сначала немного покрутиться в busy loop, и только потом окончательно проваливаться в ожидание.



    Spurious Wakeups

    Иногда, когда используешь wait-signal, то поток может внезапно проснуться без какой-либо причины (spurious wakeups). Чтобы защититься от таких внезапных wake-up’ов, надо крутить проверку условия в цикле, и, если поток проснулся, но условие ещё не выполнено, то обратно уходить в сон. Такая штука называется spin lock. То есть нужно писать что-то вроде такого, чтобы всё работало корректно:

    public class Clazz {
        // кстати, если использовать константную строку в качестве ad hoc лока, то
        // из-за интернирования строк могут быть проблемы (например, если вызываешь
        // notify(), ожидая, что у тебя только один поток в ожидании, а у тебя там
        // на этой константной строке завязаны несколько потоков)
        private final Object monitor = new Object();
        private boolean signalled = false;
    
        public void doWait() throws InterruptedException {
            synchronized (monitor) {
                while (!signalled) {
                    monitor.wait();
                }
                signalled = false;
            }
        }
    
        public void doNotify() {
            synchronized (monitor) {
                signalled = true;
                // monitor.notifyAll();
                monitor.notify();
            }
        }
    }
    
    

    Такая штука не только защищает от spurious wakeups, но и в какой-то степени предотвращает потерю уведомлений, если по какой-то причине doNotify() был вызван до doWait(). Я как-то не знаю, в какой ситуации такое может произойти на практике, видимо, вот как раз при перестановке инструкций компилятором.

    notify() vs notifyAll(). Ну и также spin lock полезен, если ждут сразу несколько потоков, но у них разные условия для выхода из состояния ожидания. Потому что в таком случае лучше вызывать notifyAll(), а не просто notify(), потому что во втором случае, скорее всего, придёт уведомление для неправильного потока. Но notifyAll() — это дорогая операция (по крайней мере дороже, чем notify()), и, если в любом случае только один поток должен быть возвращён из состояния ожидания, то тем более лучше вызывать notify().

  • Java Concurrency 2 — Аноним Apr 07, 2020 №2 Развернуть пост | Фокус на посте

    Оглавление

    • Synchronization (continued)
      • The synchronized Keyword
      • Synchronized Blocks
      • The Monitor Concept
      • Volatile Fields
      • Atomics

    Synchronization (continued)


    продолжение 14.5 в первой книжке core java

    synchronized keyword, intrinsic condition and lock objects, (timed) tryLock, downsides of intrinsic locks, почему notify и wait не могут быть вызваны в несинхронизированных кусках кода, synchronized blocks, client-side locking, the monitor concept

    volatile fields, JMM, мотивация, порядок выполнения чтений/записей относительно volatile полей (happens-before guarantee), в каких случаях достаточно/недостаточно использовать volatile, final variables

    atomics, как использовать compareAndSet на примере нахождения максимального значения чего-то там, LongAdder/LongAccumulator



    The synchronized Keyword

    Intrinsic locks. Lock objects и condition objects — это относительно мощные инструменты и в большинстве случаев нам не нужен такой уровень контроля. Вместо этого можно использовать встроенный в язык инструмент. Каждый объект имеет в себе intrinsic lock. (Instrinsic function (или же built-in function) — это функция, реализация которой автоматически генерируется компилятором, это не совсем то же самое, что и inline function, потому что тут компилятор всё знает о функции, реализацию которой подставляет в method call.) Если пометить метод ключевым словом synchronized, то этот intrinsic lock защитит сразу весь метод. То есть вот это:

    public synchronized void method() {
        // method body
    }
    

    Эквивалентно вот этому:

    public void method() {
        this.intrinsicLock.lock();
        try {
            // method body
        } finally {
            this.intrinsicLock.unlock();
        }
    }
    

    Intrinsic condition objects. Каждому intrinsic lock объекту соответствует единственный condition объект, ассоциированный с ним. Для взаимодействия с ним. Метод wait добавляет текущий поток в набор методов, ждущих выполнения условия (ему соответствует вызов intrinsicCondition.await()). Методы notify() и notifyAll() соответствуют вызовам instrinsicCondition.signal() и intrinsicCondition.signalAll().

    Intrinsic locks for static methods. Ключевым словом synchronized можно пометить и статические методы. В таком случае lock и condition объекты будут содержаться внутри соответсвующего Class объекта, так что он будет ассоциирован с любыми вызовами статических объектов, так что, пока какой-то поток находится внутри одного из статических synchronized методов, то никакой другой synchronized метод из другого потока вызван быть не может, пока тот первый поток не выйдет из метода.

    Минусы intrinsic lock’ов:

    • Нельзя прервать поток, который пытается получить lock в данный момент, потому что нет доступа к конкретному lock объекту. Но я не совсем понимаю, каким образом можно легально достать список ожидающих потоков.

    • В книжке написано, что you cannot specify a timeout when trying to acquire a lock. Я так понимаю, что это про методы tryLock() и конкретно его timed версию tryLock(long timeout, TimeUnit unit). Простой tryLock() пытается получить lock, если он занят другим потоком, то он просто возвращает false и сдаётся и не ждёт, пока другие потоки выйдут из критической секции. При этом даже, если в этот момент другие потоки тоже ждут, то вне зависимости от наличия fair ordering policy он войдёт в критическую секцию. Это такая странная, видимо, намеренная особенность этого метода.

      Timed версия этого метода, во-первых, ждёт некоторое время, чтобы попытаться войти в критическую секцию, а, во-вторых, этот метод учитывает fair ordering policy, так что его можно использовать с нулевым таймингом как альтернативу обычному tryLock().

      Так вот, в случае с intrinsic lock’ом таких методов взаимодействия с ним нет.

    • Having a single condition per lock can be inefficient. Всё же с одним lock’ом может быть ассоциировано несколько условий. Если вызывается wait сразу для всех потенциальных условий, то выполнение всех потоков, которые находятся в критических секциях, временно приостановится, пока не вызовется signal. И, я так думаю, что из-за этого есть шанс того, что всё дедлокнется.

    Кстати, ещё такое, что я пропустил: все эти методы wait, notify, notifyAll могут быть вызваны только внутри synchronized методов или блоков (ссыл ка). Потому что race condition. Например, если у нас есть такая штука:

    // первый поток
    while (!condition) {    // 1
        wait();             // 4
    }
    // второй поток
    satisfyCondition();     // 2
    notifyAll();            // 3
    

    То может быть так, что первый поток проверяет условие, понимает, что оно не выполняется, и после этого момента управление передаётся во второй поток, который выставляет выполнение условия и уведомляет всех об этом, но первый поток ещё не находится в состоянии ожидания condition’а, так что это никак на него не влияет. И потом управление передаётся в первый поток, и он переходит в состояние ожидания.

    Это объясняет, почему первый кусок кода должен быть синхронизирован. А необходимость в синхронизации второго куска кода объясняется тем, что надо синхронизировать работу с объектами, связанными с условием.



    Synchronized Blocks

    Можно ещё вот так acquire’ить intrinsic lock объекты:

    synchronized (obj) {
        // critical section
    }
    

    Ad hoc locks (exploiting intrinsic locks). И тогда никакой другой synchronized метод у obj не сможет быть вызван, пока какой-то поток находится в критической секции. Кстати, это можно использовать, чтобы неявно создать lock объект и не надо было писать вот ту дефолтную штуку с try-finally:

    public class Clazz {
        private Object lock = new Object();
        
        public void doSomething() {
            // ...
            synchronized (lock) {
                // critical section
            }
            // ...
        }
    }
    

    Client-side locking. Если у нас есть какой-то класс с синхронизированными операциями и мы хотим реализовать какие-то ещё синхронизированные операции для него как пользователь класса, то можно попробовать сделать так:

    public void doSomethingElse(Clazz obj) {
        synchronized (obj) {
            // critical section
        }
    }
    

    Однако нет никакой гарантии, то объект будет использовать свой intrinsic lock, для своих синхронизированных методов, так что это всё жесть ненадёжно, целиком и полностью зависит от внутренней реализации класса.



    The Monitor Concept

    Monitor class — это по сути примерно вот эта штука с intrinsic lock’ами, которая реализует mutual exclusion для потоков и у неё есть механизм для уведомления ждущих потоков (condition variable). Конкретно там вот такие требования:

    • A monitor is a class with only private fields
    • Each object of the class has an associated lock
    • All methods are locked by that lock
    • The lock can have any number of associated conditions

    И instrinsic lock’и не соответствуют этим требованиям как минимум в том, что поля могут не быть приватными, не все методы обязаны быть синхронизированными, и condition тут только один. И ещё instrinsic lock доступен клиентам (можно получить его в synchronized блоке). И, видимо, это небезопасно, потому что можно изменить извне состояние объекта таким образом, что какие-то потоки навсегда перейдут в состояние ожидания?



    Volatile Fields

    ссылка

    Memory model определяет то, каким образом потоки взаимодействуют с памятью компьютера (RAM), то есть, как связаны переменные в программе и то, как они сохраняются и достаются из памяти на низком уровне. В частности JMM — это Java Memory Model, она вот в частности определяет то, как разные потоки видят значения переменных, разделяемых между несколькими потоками. И проблема в том, что в JMM компилятор имеет большую свободу в том, как ему конкретно выполнять манипуляции с данными (из-за всяких оптимизаций, например, которые переставляют порядок инструкций), из-за чего переменная, разделяемая несколькими потоками, может иметь различные значения с точек зрения различных потоков. И это было в особенности критично в какой-то там старой JMM, сейчас этого вроде нет, например:

    String s1 = "/usr/tmp";
    String s2 = s1.substring(4); // contains "/tmp"
    

    s2 использует тот же массив из символов, что и s1, но просто меняет сдвиг и длину, которые выставляются в конструкторе String. И со старой JMM было так, что можно было из другого треда увидеть, что offset у s2 равен не 4, а 0, из-за чего казалось, что внутри s2 не "/tmp", а "/usr".

    Ну и ещё проблема в порядке выполнения чтений/записи при отсутствии синхронизации: компилятору позволено переставлять инструкции так, чтобы текущий поток разницы для себя не заметил. Из-за этого вот так писать:

    // do the initializing
    // ...
    initialized = true;
    

    Нельзя, потому что непонятно в рамках текущего потока, когда конкретно выставится флаг initialized.

    Volatile keyword. (ссылка) В связи со всем этим есть ключевое слово volatile для полей, которое заставляет компилятор делать так, чтобы изменения для поля, помеченного этим словом, были одинаковыми одновременно для всех потоков. При этом это не гарантирует неделимость операций чтения/записи, это всё всё ещё может быть прерванным.

    Это всё для того, чтобы значение отображалось корректно. Оно внутри по-другому как-то реализовано, но просто в качестве примера: CPU использует кэш в качестве буфера при записи/чтении, так что в общем случае без применения каких-то приколов (штуки для когерентности кэша — частный пример) может быть так, что из разных потоков одна и та же переменная видна с разными значениями, или ещё может быть так, что операции записи в неё из разных потоков видятся в разном порядке.

    The CPU cache used by Thread 1 and main memory contains different values for the counter variable.

    Кроме того, volatile поле выступает в роли memory barrier:

    • компилятор не может переносить инструкции выше чтения из volatile поля
    • компилятор не может переносить инструкции ниже записи в volatile поле

    Так что с помощью volatile keyword можно пометить вот этот флаг initialized

    // do the initializing
    // ...
    initialized = true;
    

    И тогда вроде он будет корректно отображать то, когда что-то там на самом деле будет инициализировано.

    Когда недостаточно объявить поле volatile. Как уже упоминалось, volatile не делает запись/чтение из поля неделимым: всё ещё может быть так, что выполнение присваивание будет прервано до того, как новое значение будет скопировано из кеша. И это всё особенно будет плохо, если новое значение поля вычисляется на основе старого. Тут придётся синхронизировать блок кода, который отвечает за увеличение переменной.

    Но volatile приемлемо и нужно использовать, например, если один поток пишет, а другие — читают.

    Final variables. Если поле объявлено как final, то его содержимое тоже будет видимым одинаково для всех остальных потоков: другие потоки получают к нему доступ только после того, когда ему будет присвоено какое-то конечное значение. То есть вот в таком случае:

    final Map<String, Double> map = new HashMap<>();
    

    гарантируется, что HashMap будет инициализирован прежде, чем к map кто-то из другого потока получит доступ.


    Atomics

    Atomics в джавававе — это всякие структуры данных из пакета java.util.concurrent.atomic, которые эффективно (эффективнее, чем с использованием lock’ов) реализуют всякие инструкции так, чтобы они были неделимыми. Например, AtomicInteger — это интеджер, у которого все операции неделимы. Например, инкремент AtomicInteger.addAndGet(int) не может быть прерван другими потоками.

    Поиск максимального значения (пример). Тут всё немного сложнее, потому что нельзя просто сделать вот так:

    largest.set(Math.max(largest.get(), observed));
    

    Потому что после того, как вызовется largest.get(), у него есть шанс измениться в большую сторону, и тогда может всё заруиниться. Вместо этого такое можно делать в цикле с использованием метода AtomicInteger.compareAndSet(oldValue, newValue), который проверяет, актуально ли вот это старое значение и, если да, то оно меняет его на новое:

    do {
        oldValue = largest.get();
        newValue = Math.max(oldValue, observed);
    } while (!largest.compareAndSet(oldValue, newValue));
    

    И это всё может показаться слишком медленным, потому что ты можешь потратить несколько попыток прежде, чем ты пройдёшь цикл без прерываний другими потоками, которые бы изменяли largest, но это всё ещё быстрее, чем использование lock’ов. Такая штука с тем, чтобы сохранить значение, а потом попробовать его использовать, называется оптимистичными апдейтами. Тот же механизм используется вроде как в реализации всех остальных “атомизированных” методах для работы с интеджером.

    Но, начиная с Java 8 можно не использовать етот бойлерплейт код (boilerplate code — куски кода, которые много где повторяются with little or no alteration, вроде слышал, но забыл такое). Тут можно вот такое написать с использованием лямбд, особенно выглядит нормально:

    largest.updateAndGet(x -> Math.max(x, observed));
    // или вот так
    largest.accumulateAndGet(observed, Math::max);
    // последний метод эквивалентен вызову выше (ну, то есть там порядок аргументов такой же:
    // первый аргумент - это предыдущее значение, а второе - то, которое передаётся
    // в качестве аргумента)
    

    LongAdder и LongAccumulator. Но по сути это просто удобная обёртка для того кода выше с оптимистичными апдейтами. Из-за этого, если очень много потоков одновременно пытаются обращаться к atomic интеджеру, то всё может начать очень сильно тормозить, потому что выполняется куча ретраев. В таких случаях можно использовать либо LongAdder, либо более общий вариант — LongAccumulator. Внутри себя они лениво сохраняют аргументы, с которыми вызываются операции, и только потом, когда потребуется финальное значение, оно вычисляется (ну, или, видимо, при переполнении таблицы внутри?). А ещё нужно иметь в виду, что порядок, в котором будет вычисляться итоговое значение, не определён (из описания LongAccumulator):

    The order of accumulation within or across threads is not guaranteed and cannot be depended upon, so this class is only applicable to functions for which the order of accumulation does not matter.

    Так что функция, передаваемая в LongAccumulator должна быть коммутативной и ассоциативной.

    Аккумуляторы ненужны. При этом, наверное, имеет смысл просто самому сделать массив с переменной на каждый поток, или в каждый поток засунуть переменную, а потом просто всё просуммировать. И это не то чтобы сильно хуже или сложнее, но, возможно, даже эффективнее, чем использовать эту ерунду из стандартной библиотеки.

  • Java Concurrency 1 — Аноним Apr 04, 2020 №1 Развернуть пост | Фокус на посте

    Оглавление

    • What are Threads
    • Thread States
    • Thread Properties
      • Thread Priorities
      • Daemon Threads
      • Handlers for Uncaught Exceptions
    • Synchronization
      • An Example of a Race Condition
      • Lock Objects
      • Condition Objects

    What are Threads


    14.1–14.2 в первой книжке core java

    общие штуки, чем отличаются потоки от процессов, многопоточность на практике, context switch, process control blocks, как запустить новый поток, как прервать поток, почему не стоит использовать Thread.stop(), как справляться с InterruptedException



    Общие штуки. Операционная система выделяет куски ЦПУ-времени для каждого запущенного процесса, тем самым создавая впечатление того, что всё работает параллельно, так что количество “одновременно” работающих процессов не ограничено количеством процессоров (ядер). По аналогии внутри одной программы может быть создано несколько потоков (thread), которые будут выполняться одновременно. Programs that can rum more than one thread at once are said to be multithreaded.

    Процессы и потоки. Разница между несколькими процессами и потоками в том, что каждый процесс имеет свой отдельный набор переменных (данных), в то время как потоки разделяют между собой одни и те же данные. Плюс общих данных в том, что это делает взаимодействие между потоками более эффективным, минус в том, что всё становится немного сложнее и непредсказуемее, придётся синхронизировать ресурсы, используемые совместно.

    Необходимость в многопоточности возникает в самых простых случаях: например, когда надо прерывать выполнение какого-нибудь затратного по времени процесса, а не ждать его завершения. В общем случае многопоточность нужна для того, чтобы:

    • эффективнее использовать процессорное время одного CPU (одного ядра). Это как раз относится к случаю, когда выполняется какой-то затратный процесс и нужно, чтобы на фоне него можно было выполнять ещё какую-то работу (поток с отрисовкой GUI приложения на фоне потока с выполнением какой-то логики).
    • эффективнее использовать несколько CPU, чтобы распределить работу между ними.
    • улучшение user experience относительно responsiveness и fairness — это всё частные случаи предыдущих пунктов. Отзывчивость = например, давать процессорное время на отрисовку GUI и взаимодействие с программой в то время, как она выполняет что-то полезное. Fairness = например, обрабатывать несколько запросов к серверу одновременно, жертвуя производительностью при выполнении каждого отдельного запроса.

    Multithreading Costs, Context Switch. Для того, чтобы переключиться от одного потока к другому, CPU нужно сохранить куда-то все локальные данные вроде program counter’а, каких-то регистров и других данных, которые там нужны потоку. Все эти данные для каждого потока хранятся в process control блоках (PCB), которые там уже объединены в какую-то структуру данных, а сам процесс называется context switch‘ем. Выгрузка и загрузка этих данных занимает некоторое время, тут также стоит учитывать то, как часто conext switch происходит за малые промежутки времени. Ну и помимо time expenses есть ещё очевидные memory expenses: если создать кучу потоков, то это потребует некоторое количество дополнительной памяти.

    Как запустить новый поток. Чтобы запустить код в отдельном потоке достаточно просто создать объект класса Thread, передав ему код, который надо запустить через функциональный интерфейс Runnable (лямбда выражением просто можно). И после этого надо будет вызвать thread.start(). Раньше было принято также наследовать свой класс от Thread, в котором переопределять метод run(), так что бы достаточно просто было написать new MyThread().run() для запуска нового потока. Однако такой подход не рекомендуется, потому что каждый раз выделять новый поток на каждую задачу — слишком жирно, надо разделять задачи, которые требуется выполнить, и механизм для их выполнения.

    Прерывание потоков. Поток может завершиться естественным образом, если происходит возвращение из запущенного метода run(), или, когда там выбрасывается неперехваченное исключение. Можно также использовать метод Thread.stop() из другого потока, однако сейчас этот метод устарел:

    This method is inherently unsafe. Stopping a thread with Thread.stop causes it to unlock all of the monitors that it has locked (as a natural consequence of the unchecked ThreadDeath exception propagating up the stack). If any of the objects previously protected by these monitors were in an inconsistent state, the damaged objects become visible to other threads, potentially resulting in arbitrary behavior. Many uses of stop should be replaced by code that simply modifies some variable to indicate that the target thread should stop running. The target thread should check this variable regularly, and return from its run method in an orderly fashion if the variable indicates that it is to stop running. If the target thread waits for long periods (on a condition variable, for example), the interrupt method should be used to interrupt the wait.

    Другие способы прервать поток. Однако, кроме Thread.stop() нет другого способа насильно завершить поток, разве что вызвать Thread.interrup(), чтобы запросить завершение треда. Это выставляет флаг внутри потока, переводя его в interrupted status. Thread.currentThread() — чтобы получить доступ к текущему потоку, curentThread.isInterrupted() — чтобы проверить, не был ли прерыван тред. (Есть ещё статический метод interrupted, который возвращает флаг для текущего потока, и сразу же его сбрасывает в false.) Однако, если поток заблокирован (например, если у него вызвали sleep или wait), то у него просто нет шанса на то, чтобы завершиться. Поэтому в таком случае выбрасывается InterruptedException (это checked exception, так что пользователь обязан корректно его перехватить).

    Нет чёткого никакого требования на уровне ЯП касательно того, когда “прерванный” поток должен завершиться, прерывание — это просто указание на то, что потоку надо бы завершиться.

    Handling InterruptedException. Не стоит писать штук вроде:

    try {
        sleep(100);
    } catch (InterruptedException e) { }
    

    Даже, если ничего не получается придумать, то всегда есть выбор из вот такимх универсальных вариантов:

    • Можно проглотить исключение и всё-таки выставить флаг через Thread.currentThread().interrupt().
    • А можно просто перебросить исключение выше по иерархии, указав throws InterruptedException в объявлении метода.



    Thread States


    14.3 в первой книжке core java

    в каких состояниях может находиться поток, как проходит цикл жизни потока, какие-то методы, связанные с этим и общие слова



    Состояния потока. Потоки могут быть в одном из следующих состояний:

    • New
    • Runnable
    • Blocked
    • Waiting
    • Timed waiting
    • Terminated

    Для определения текущего состояния достаточно вызвать Thread.getState(), который вернёт Thread.State.

    New threads. Это начальное состояние потоков, с которым они создаются. В этом состоянии они ещё не запущены.


    Runnable Threads. Как только был вызван метод start, поток становится runnable. При этом он необязательно выполняется прямо конкретно сейчас, он может быть на паузе, пока выполняются другие потоки, но операционная система может дать ему шанс немного поработать. Операционная система опирается на определённый список приоритетов (которые будут обсуждены несколько позже), когда выбирает, какой поток возобновить следующим. Такой вариант распределения ресурсов между потоками называется preemptive scheduling. Non-preemptive (cooperative) scheduling — это, когда процесс отпускает ресурсы только, когда он завершается или переходит в ждущее состояние, то есть он не прерывается операционной системой.


    Blocked and Waiting Threads. Если поток находится в одном из этих состояний, то он временно неактивен, потребляет минимальное количество ресурсов, thread scheduler сам решает, когда заново активировать такие потоки:

    • Когда поток пытается достать intrinsic object lock (что бы это ни значило, это будет объяснено позже), который захвачен каким-то другим потоком, то этот поток переходит в заблокированное состояние. Выходит из заблокированного состояния только, когда все остальные потоки отпустят этот lock.
    • Когда поток ждёт, пока другой поток уведомит thread scheduler о condition (что бы это ни значило, это будет объяснено позже). Конкретно это происходит при вызове Object.wait или Thread.join, или, когда поток поток ждёт Lock или Condition из пакета java.util.concurrent. На практике разница между blocked и waiting состояниями не так существенна.
    • Некоторые методы, имеют параметры тайм-аута, и они переводят поток в timed waiting state. И поток находится в этом состоянии, пока не закончится таймер, или, пока не поступит подходящее уведомление.

    image-20200406211207977


    Как выбирается следующий поток. Когда поток переходит в заблокированное или ждущее состояние, thread scheduler выделяет ресурсы следующему потоку в очереди. Когда поток снова активируется (например, когда завершается таймер или когда поток получил lock), scheduler сравнивает приоритет этого потока с приоритетами текущих runnable потоков, и в зависимости от этого, возможно, запускает этот поток.

    Некоторые методы, связанные с этими состояниями:

    • Thread.join() — ждать, пока не завершится этот поток.
    • Thread.join(long millis) — ждать указанное время или, пока не завершится этот поток.
    • Thread.suspend() — прервать выполнение потока. Этот метод устарел.
    • Thread.resume() — продолжить выполнение потока, можно вызвать только после вызова метода suspend(). Этот метод устарел.


    Terminated Threads. Поток завершается по одной из следующих причин:

    • Поток завершается естественным образом, когда происходит возвращение из метода run.
    • Поток завершается выбросом исключения в методе run.
    • А ещё можно остановить методом stop, но, как уже упоминалось, этот метод устарел. В сущности его вызов выбрасывает исключение ThreadDeath, так что это в каком-то смысле частный вариант предыдущего случая.



    Thread Properties


    14.4 в первой книжке core java

    приоритеты потоков, платформозависимость потоков, какие-то методы, пример, как можно всё заруинить, если самому выставлять приоритеты потоков, daemon threads, handlers for uncaught exceptions, thread groups



    Thread Priorities

    Thread Priorities. Каждый поток имеет свой приоритет. По умолчанию приоритет нового потока наследуется от потока, в котором этот поток был создан, но можно задать и конкретный приоритет, который обозначается целыми числами от 1 до 10 (MIN_PRIORITY = 1, NORM_PRIORITY = 5, MAX_PRIORITY = 10). Всякий раз, когда thread scheduler’у надо выбрать новый поток, он предпочитает потоки с более высокими приоритетами потокам с низкими.

    Приоритеты потоков очень сильно зависят от конкретной платформы. Приоритеты, определённые внутри джамбы отображаются в приоритеты внутри host платформы, в которой уже может быть больше или меньше уровней приоритета, так что возможно, some of the java priorities will map to the same operating system level. Может быть и такое, что в хост платформе выставленные приоритеты и вовсе будут игнорироваться. В частности по этой причине обычно нет смысла модифицировать приоритеты потоков, и уж тем более полагаться на них при определении логики работы программ.

    А ещё можно загнать себя вот в такую ловушку: если выставить максимальный приоритет для потоков, которые никогда не становятся неактивными, то они не дадут никакого шанса другим потокам, потому что всякий раз thread scheduler будет выбирать эти high priority потоки.

    Некоторые методы, связанные со всем этим:

    • Thread.setPriority(int) — просто выставляет приоритет.
    • static yield() — текущий поток уступает другим потокам с большими или такими же приоритетами.


    Daemon Threads

    Чтобы сделать из потока daemon thread, достаточно вызвать метод thread.setDaemon(true). Такие потоки только лишь служат другим тредам, например, апдейтят или очищают что-то за ними. То есть сами по себе, в отрыве от логически связанных с ними потоками, они никакого смысла не имеют. Так что, когда остаются только daemon потоки, виртуальную машину можно останавливать. Ни в коем случае нельзя использовать daemon потоки для доступа к каким-то ресурсам, потому что то, в какой момент они остановятся, — не определено.



    Handlers for Uncaught Exceptions

    Если в методе run выбрасывается неперехваченное исключение, то выполнение потока прерывается. И его можно только попробовать перехватить внутри метода run, а перебросить выше — нельзя, потому что в объявлении run по по нятным причинам нельзя указать то, какими проверяемыми исключениями он кидается. Для этого прямо перед тем, как поток умирает, исключение передаётся handler’у для непойманных исключений:

    Thread.UncaughtExceptionHandler {
        void uncaughtException(Thread t, Throwable e)
    }
    

    (Default) uncaught exception handler. Для любого потока можно задать uncaught exception handler с помощью метода setUncaughtExceptionHandler для конкретного потока. А можно задать дефолтный handler и для всех потоков сразу с помощью статического метода с мега-длинным названием static Thread.setDefaultUncaughtExceptionHandler. Такой дефолтный handler может быть использован, например, для логирования ошибок в специальный фаел. По умолчанию дефолтный handler выставлен в null.

    Thread Group. По умолчанию все потоки принадлежат одной группе, но можно создавать и кастомные группы потоков (которые, впрочем, всё ещё будут находиться внутри дефолтной группы). То, какой uncaught exception handler будет использоваться по умолчанию в рамках группы определяется, собственно, внутри объекта-группы. ThreadGroup — как раз объект для групп потоков и он имплементирует интерфейс Thread.UncaughtExceptionHandler. При этом группы потоков могут содержать другие группы потоков, так что получается такая древовидная штука. Конструктор ThreadGroup создаёт группу и в качестве родительской задаёт группу текущего потока. Ну, или можно самому указать группу, которую надо использовать в качестве родительской (но, как я понимаю, снаружи из потока нельзя достать его группу).

    Как работают handler’ы для групп потоков. Таким образом, если для потока не задан конкретный uncaught exception handler, вызывается метод uncaughtException для группы потоков, в которой находится поток, в котором возникла ошибка. И дальше этот предпринимает следующие действия:

    1. Если у группы есть родительская группа, то метод uncaughtException вызывается для родительской группы с теми же аргументами.
    2. Иначе, если выставлен дефолтный handler, то вызывается он.
    3. Иначе, если неперехваченное исключение — это ThreadDeath, то ничего не происходит, потому что это исключение нарочно выбрасывается при вызове устаревшего Thread.stop для остановки выполнения потока.
    4. Иначе в System.err просто печатается stack trace и содержание исключения.
    public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \"" + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }
    

    Вот это стандартное поведение особо смысла не имеет, потому что в конечном итоге всё равно вызывается стандартный handler для исключений. Предполагается, что этот метод будет переопределён его пользователями, чтобы он имел более осмысленное поведение.

    Однако есть гораздо более лучшие приколы для оперирования с наборами потоков, чем вот эти группы потоков, поэтому эти все штуки в книжке особо подробно не рассматриваются (собственно, как и многопоточность в целом).



    Synchronization


    14.5 в первой книжке core java

    race condition (race hazard), пример этого состояния


    Общие слова. На практике в большинстве случаев в многопоточных приложениях два и более потоков могут одновременно взаимодействовать с одними и теми же данными, из-за чего данные могут повредиться, если доступ будет производиться в неопределённом порядке. Когда такое происходит, то это называют ещё race condition или race hazard (например, когда один поток пытается получить доступ к куску памяти, в который в этот момент пишет другой поток).



    An Example of a Race Condition

    Чтобы избежать повреждения данных, разделяемых несколькими потоками, нужно синхронизировать доступ к данным. Для демонстрации race condition можно рассмотреть небольшую программу, которая симулирует банк с несколькими счетами, которые пересылают транзакции друг другу. Мы попробуем дёргать метод пересылки денег из нескольких потоков одновременно для симуляции race condition.

    Вот это главный класс Bank, который занимается переводами денег между банковскими счетами.

    import java.util.Arrays;
    import java.util.stream.DoubleStream;
    
    class Bank {
        private static final int ACCOUNTS_COUNT = 20;
        // max amount of money per single transaction
        private static final double MAX_AMOUNT = 1000.0;
        // max delay between two transactions in a thread
        private static final int MAX_DELAY = 10;
    
        private double[] accounts;
    
        /**
         * Generates a collection of bank accounts with randomized accounts
         * balances.
         */
        public Bank() {
            accounts = new double[ACCOUNTS_COUNT];
            Arrays.fill(accounts, 5 * MAX_AMOUNT);
        }
    
        /**
         * For simplicity we are not checking, if the {@code from} account has
         * enough money to transfer money to the {@code to} bank account.
         */
        public void transfer(int from, int to, double amount) {
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("Money (%.2f) has been sent from %d to %d\n",
                    amount, from, to);
            accounts[to] += amount;
            System.out.printf("(%d -> %d) Money received. Total Balance: %10.2f\n",
                    from, to, getTotalBalance());
        }
    
        /**
         * Returns the total amount of money stored in the bank.
         */
        public double getTotalBalance() {
            return DoubleStream.of(accounts).sum();
        }
    
        public static void main(String[] args) {
            Bank bank = new Bank();
            // each bank account sends a random amount of money to the random
            // bank account from a separate thread
            for (int i = 0; i < ACCOUNTS_COUNT; i++) {
                int from = i;
                Thread t = new Thread(() -> {
                    try {
                        while (true) {
                            int to = (int) (ACCOUNTS_COUNT * Math.random());
                            if (to == from) {
                                to = (to + 1) % ACCOUNTS_COUNT;
                            }
                            double amount = MAX_AMOUNT * Math.random();
                            bank.transfer(from, to, amount);
                            Thread.sleep((int) (MAX_DELAY * Math.random()));
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
                t.start();
            }
        }
    }
    

    Проблема в том, что иногда потоки могут быть прерваны прямо в момент пересылки денег, из-за чего, например, если проверить сумму балансов на всех счетах в момент, когда некоторые из транзакций ещё не завершились, то она может оказаться меньше, чем есть на самом деле в теории.

    Thread[Thread-19,5,main]
    Money (917.92) has been sent from 19 to 11
    (19 -> 11) Money received. Total Balance:  100000.00
    Thread[Thread-11,5,main]
    Thread[Thread-1,5,main]
    Money (666.94) has been sent from 1 to 3
    Thread[Thread-8,5,main]
    (1 -> 3) Money received. Total Balance:   99877.79
    Thread[Thread-5,5,main]
    Money (582.36) has been sent from 5 to 10
    (5 -> 10) Money received. Total Balance:   99250.18
    Money (627.62) has been sent from 8 to 9
    (8 -> 9) Money received. Total Balance:   99877.79
    Money (122.21) has been sent from 11 to 0
    (11 -> 0) Money received. Total Balance:  100000.00
    

    Кроме того проблема может возникнуть, когда два потока одновременно выполняют строку кода accounts[to] += amount. Это не атомарная операция, она включает в себя несколько действий:

    1. Загрузить accounts[to] в регистр.
    2. Добавить к значению в регистре amount.
    3. Выгрузить из регистра результат обратно в acounts[to].

    Проблема может возникнуть, например, если какой-то второй поток прерывает выполнение текущего (первого) потока после выполнения второго шага. Допустим, второй поток модифицирует значение accounts[to], после чего управление опять передаётся в первый, который выгружает из временного регистра устаревшее значение, тем самым полностью отменяя результат работы второго потока. Таким образом, данные становятся повреждёнными.

    Однако вероятность того, что такая ошибка произойдёт даже с учётом того, что в каждом потоке есть всего лишь несколько строк кода, очень мала, так что, возможно, эта ошибка в некоторых случаях даже не стоит того, чтобы пытаться исправить её. Но её надо принимать во внимание, в особенности, если, например, предполагается, что программа будет запущена на протяжении долгого времени.



    Lock Objects

    Synchronized keyword и lock объекты. Есть два основных механизма для защиты блока кода от concurrent доступа из двух разных потоков. Для этого в джемпере есть ключевое слово synchronized, и ещё есть объекты класса ReentrantLock. Ключевое слово synchronized автоматически защищает блок кода от одновременного доступа из разных потоков, в сущности то же самое можно сделать с помощью java.util.concurrent фреймворка. И сначала будет рассмотрен вот етот немного cumbersome способ с помощью lock объектов, а не простое и удобное synchronized keyword.

    myLock.lock(); // a ReentrantLock object
    try {
        // critical section
    } finally {
        // Make sure the lock is unlocked even if an exception is thrown.
        // Otherwise, in case an unhandled exception is thrown, all other
        // threads will be locked forever.
        myLock.unlock();
        // Ещё надо убедиться в том, что shared объект не повреждён прежде, чем
        // к нему получат доступ другие объекты
    }
    

    Вот такая конструкция гарантирует, что в выделенной критической секции в каждый момент времени находится только один поток. Как только один из потоков доходит до строки myLock.lock(), он запирает lock-объект, и все остальные потоки стопорятся на этой строке до тех пор, пока тот первый поток не откроет lock-объект. Эта штука похожа на развёрнутое try-with-resources-statement: там тоже в конце в finally освобождаются ресурсы, но здесь оно бы не сработало, во-первых, потому что ReentrantLock не реализует AuntoClosable, а, во-вторых, потому что try-with-resources каждый раз требует объявления новой переменной, а нам надо, чтобы lock-объект был один для всех потоков.

    Lock называется reentrant, потому что поток может несколько раз повторно завладеть lock’ом, которым он уже владеет. Чтобы отпустить lock поток должен вызвать unlock() ровно столько раз, сколько был вызван lock() (внутри lock-объекта просто есть hold count, который следит за этим). Это необходимо для реализации вложенных вызовов методов, которые используют одни и те же lock объекты: например, если в примере выше у нас бы были защищены lock’ами методы transfer и getTotalBalance, то lock() будет вызван два раза: один раз после входа в transer, и ещё один раз после входа getTotalBalance.

    Rule of thumb. В целом, короче, lock объекты надо использовать, когда один и тот же объект может быть в теории одновременно заппдейтчен из разных потоков, так что надо убедиться в том, что каждый поток полностью завершает свою работу с shared объектом прежде, чем к нему получит доступ другой объект.

    Каким образом выбирается поток, который следующим войдёт в критическую секцию. У ReentrantLock есть конструктор с флагом fair, если выставить его, то будет выбран тот поток, который ждал дольше всего, а по умолчанию, видимо, выбирается тот, у которого больше приоритет. Однако этот при кол нужно использовать только, если точно знаешь, что ты делаешь, потому что нет гарантии, что thread scheduler тоже лучше относится к потокам, которые долго ждут.



    Condition Objects

    Описание проблемы. Даже, если поток получил доступ к критическому куску кода, может быть такое, что для него не выполнено какое-то условие, из-за которого он пока что не может выполнять никакую полезную работу в этом месте. Чтобы разобраться с такими случаями, есть condition objects или condition variables (исто рическое название просто). В случае с примером с банковскими счетами выше условием для возможности проведения транзакции будет наличие достаточного количества денег.

    Нельзя просто в начале сделать проверку:

    if (bank.getBalance(from) >= amount) {
        bank.transfer(from, to, amount);
    }
    

    Потому что сразу после проверки управление может перейти другому потоку, который может снять все деньги с банковского счёта from, и тогда потом деньги снимутся вне зависимости от выполнения условия. Можно было бы запихнуть эту проверку внутрь критической секции, но тогда нам бы пришлось ждать внутри критической секции, пока кто-нибудь бы не добавил денег, чтобы можно было совершить транзакцию, но проблема в том, что тогда другие потоки не смогут войти в критическую секцию, так что даже в теории никто не сможет перечислить деньги.

    public void transfer(int from, int to, double amount) {
        lock.lock();
        try {
            while (accounts[from] < amount) {
                // wait
            }
            // transfer
        } finally {
            lock.unlock();
        }
    }
    

    Condition objects. Для этого как раз есть вот эти condition объекты. С lock объектом может быть ассоциирован один или несколько condition объектов. Condition объект можно достать из lock объекта с помощью метода lock.newCondition(). Условие нужно сохранить куда-нибудь, переменную (поле) с условием принято называть тем условием, которое должно выполниться, чтобы поток продолжил своё выполнение.

    class Bank {
        // то есть condition object у нас - это поле внутри объекта банка
        private Condition sufficientFunds;
        ...
        public Bank() {
            ...
            sufficientFunds = lock.newCondition();
        }
    }
    

    Condition wait set. Если в процессе выполнения транзакции мы обнаруживаем, что на счету недостаточно денег, то мы можем вызвать sufficientFunds.await(), чтобы отдать управление другим потокам. И управление к нему не перейдёт до тех пор, пока кто-то другой не вызовет sufficientFunds.signalAll(), а до тех пор поток будет оставаться в condition wait set’е для этого условия.

    В нашем случае signalAll() должен быть вызван после того, как какой-то поток перечисляет кому-нибудь деньги, тогда есть шанс того, что условие выполнилось, и можно продолжать выполнение потоков, попавших в condition wait set. Вызов этого метода позволяет потоку снова запуститься, и тогда надо будет ещё раз проверить выполнение условие, потому что, опять же, нет гарантии, что оно выполнено.

    Метод signal. Помимо signalAll есть ещё более эффективный метод signal, который разблокирует только один поток. Поэтому есть шанс того, что разблокируется поток, который ещё не готов к тому, чтобы разблокироваться, так что есть шанс того, что всё дедлокнется.

    public void transfer(int from, int to, double amount) {
    	lock.lock();
        try {
            while (accounts[from] < amount) {
                sufficientFunds.await();
            }
            // transfer funds
            ...
            sufficientFunds.signalAll();
        } finally {
            lock.unlock();
        }
    }
    

    В общем случае вот так:

    while (!condition) {
        conditionObject.await();
    }
    

    Deadlock. Очень важно, чтобы хотя бы кто-то вызвал signalAll(), чтобы не получилось так, что все потоки заблокировались, так что ни один из них не может быть возобновлён. Такая штука называется дедлоком.

    Когда надо вызывать signallAll. В общем случае надо сигнализировать остальным потокам, что они могут попробовать возобновить свою работу, всякий раз, когда состояние объекта меняется так, что нужное условие в теории может быть удовлетворено.

    Таким образом:

    • Lock’и защищают куски кода, позволяя только одному потоку выполнять их.
    • Lock’и менеджат потоки, которые пытаются войти в критический кусок кода.
    • Lock может иметь один или несколько ассоциированных condition объектов.
    • Condition’ы менеджат потоки, которые вошли в критический кусок кода, но не могут пройти дальше.