Skip to content

Latest commit

 

History

History
192 lines (139 loc) · 30.2 KB

concurrency.md

File metadata and controls

192 lines (139 loc) · 30.2 KB

Оглавление

Java Concurrency

Расскажите про методы wait, notify, notifyAll и ключевое слово synchronized

В принципе, статьи на Baeldung должно хватить. Лучше, конечно, пописать код с использованием wait, notify, notifyAll и synchronized руками. Также можно почитать официальный туториал от Oracle по Concurrency в Java.

Но если хотите пойти глубже, то хаброписатели опять спешат на помощь - тут. А также Java Language Specification, раздел 17.1 и 17.2.

к содержанию

JMM. Зачем нужно volatile. Популярный вопрос

Не знаю как у вас, но у меня при упоминании JMM молниеносно всплывает в голове Алексей Шипилёв и его доклады - раз, два, три. Если вы больше чтец, чем смотрец, то Алексея можно и почитать - ать, два.

Кроме того, абсолютно не будет лишним посмотреть доклад Романа Елизарова по теоретическому минимуму JMM.

Если совсем нет времени, то можно пробежаться по небольшой статейке по JMM. Если есть время и интерес, тогда углубляемся в тему через статью на Хабре. А ещё на Хабре есть неплохой перевод статьи "Многопоточность. Java-модель памяти": часть 1 и часть 2.

Несомненным источником истины является Java Language Specification, раздел 17.4.

Также ответ на этот вопрос можно прочитать на itsobes.ru. А ещё можно ознакомиться с вопросом на JVM-уровне в статье How ‘volatile’ works on JVM level? на Medium.

к содержанию

Что такое Executor и ExecutorService, Thread pool и зачем нужны?

Создавать и убивать потоки - дорого. Давайте создадим N потоков (Thread pool) и будем их переиспользовать. А давайте. Вот тут описано развёрнуто.

Executor (void execute​(Runnable command) - вот и весь интерфейс) и ExecutorService (уже покруче, может запускать Callable и не только) - грубо говоря, интерфейсы выполняторов параллельных задач. А реализуют их различные выполняторы на пулах потоков. Экземпляры готовых конкретных выполняторов можно получить с помощью класса Executors. Если смелый-умелый и зачем-то надо, то можно и самому реализовать, конечно.

Также подробнее можно почитать:

к содержанию

Что внутри параллельных стримов? На каком пуле работают параллельные стримы и в чем его особенность?

По умолчанию parallel stream использует ForkJoinPool.commonPool размером Runtime.getRuntime().availableProcessors() — 1. Common pool создаётся статически при первом обращении к ForkJoinPool и живёт до System::exit (игнорирует shutdown() или shutdownNow()). Когда некий поток отправляет задачу в common pool, то pool может использовать его же в качестве воркера. Common pool один на всё приложение. Можно запустить stream на отдельном ForkJoinPool - завернуть параллельный stream в Callable и передать на вход методу submit созданного ForkJoinPool. Этот трюк работает благодаря методу fork() из ForkJoinPool (тут подробности).

Сам по себе ForkJoinPool представляет реализацию ExecutorService, выполняющую ForkJoinTask (RecursiveAction и RecursiveTask). Данный pool создан для упрощения распараллеливания рекурсивных задач и утилизации породивших подзадачу потоков. ForkJoinPool использует подход work stealing - у каждого потока есть его локальная очередь задач, из хвоста которой другие потоки могут тырить себе задачи, если у них закончились свои. Украденная задача делится и заполняет очередь задач потока.

Подробнее:

к содержанию

Как работает ConcurrentHashMap?

ConcurrentHashMap - это потокобезопасная мапа (карта, словарь, ассоциативный массив, но тут и далее просто "мапа"), у которой отсутствуют блокировки на всю мапу целиком.

Особенности реализации:

  • Поля элемента мапы (Node<K,V>) val (значение) и next(следующее значение по данному ключу в цепочке или дереве), а также таблица бакетов (Node<K,V>[] table) объявлены как volatile
  • Для операций вставки первого элемента в бакет используется CAS - алгоритм, а для других операций обновления в этой корзине (insert, delete, replace) блокировки
  • Каждый бакет может блокироваться независимо путём блокировки первого элемента в корзине
  • Таблице бакетов требуется volatile/atomic чтения, запись и CAS, поэтому используются intrinsics-операции (jdk.internal.misc.Unsafe)
  • Concurrent resizing таблицы бакетов
  • Ленивая инициализация таблицы бакетов
  • При подсчёте количества элементов используется специальная реализация LongAdder

В результате имеем:

  • Извлечение значения возвращает последний результат завершенного обновления мапы на момент начала извлечения. Или перефразируя, любой non-null результат, возвращаемый get(key) связан отношением happens-before со вставкой или обновлением по этому ключу
  • Итераторы по ConcurrentHashMap возвращают элементы отображающие состояние мапы на определённый момент времени - они не бросают ConcurrentModificationException, но предназначены для использования одним потоком одновременно
  • Нельзя полагаться на точность агрегирующих методов (size, isEmpty, containsValue), если мапа подвергается изменениям в разных потоках
  • Не позволяет использовать null, который однозначно воспринимается как отсутствие значения
  • Поддерживает потокобезопасные, затрагивающие все (или многие) элементы мапы, операции - forEach, search, reduce (bulk operations). Данные операции принимают на вход функции, которые не должны полагаться на какой-либо порядок элементов в мапе и в идеале должны быть чистыми (за исключением forEach). На вход данные операции также принимают parallelismThreshold - операции будут выполняться последовательно, если текущий размер мапы меньше parallelismThreshold. Значение Long.MAX_VALUE сделает операцию точно последовательной. Значение 1 максимизирует параллелизм и утилизацию ForkJoinPool.commonPool(), который будет использоваться для параллельных вычислений

На Хабре есть несколько устаревшая статья - будьте внимательны и осторожны с java 8 произошли изменения. Класс Segment<K,V> максимально урезан и сохранён только для обратной совместимости при сериализации, где и используется. concurrencyLevel также оставлен лишь для обратной совместимости и теперь служит в конструкторе только для увеличения initialCapacity до количества предполагаемых потоков-потребителей мапы:

    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads

Есть более современная статья с примером реализации ConcurrentMap. Также можно почитать гайд по ConcurrentMap на Baeldung.

к содержанию

Как работают Атомики?

Атомарная операция — это операция, которая выполняется полностью или не выполняется совсем, частичное выполнение невозможно.

Атомики - это классы, которые выполняют операции изменения своего значения атомарно, т.о. они поддерживают lock-free thread-safe использование переменных. Достигается это с помощью алгоритма compare-and-swap (CAS) и работает быстрее, чем аналогичные реализации с блокировками. На уровне инструкций большинства процессоров имеется поддержка CAS.

В общем случае работу Атомиков можно описать следующим образом. Атомик хранит некоторое volatile значение value, для изменения которого используется метод compareAndSet(current, new), поэтому предварительно читается текущее значение - current. Данный метод с помощью CAS изменяет значение value только в том случае, если оно равно ожидаемому значению (т.е. current), прочитанному перед запуском compareAndSet(current, new). Если значение value было изменено в другом потоке, то оно не будет равно ожидаемому. Следовательно, метод compareAndSet вернет значение false. Поэтому следует повторять попытки чтения текущего значения и запуска с ним метода compareAndSet(current, new) пока current не будет равен value.

Условно можно разделить методы Атомиков на:

  • compare-and-set - принимают current на вход и делают одну попытку записи через CAS
  • set-and-get - самостоятельно читают current и пытаются изменить значение с помощью CAS в цикле, как описано выше

Непосредственно изменение значения value делегируется либо VarHandle, либо Unsafe, которые в свою очередь выполняют его на нативном уровне. VarHandle - это динамически сильно типизированная ссылка на переменную или на параметрически определяемое семейство переменных, включающее статические поля, нестатические поля, элементы массива или компоненты структуры данных нестандартного типа. Доступ к таким переменным поддерживается в различных режимах, включая простой доступ на чтение/запись, volotile доступ на чтение/запись и доступ на compare-and-swap.

В java.util.concurrent.atomic имеется следующий набор атомиков:

  • AtomicBoolean, AtomicInteger, AtomicLong, AtomicIntegerArray, AtomicLongArray - представляют атомарные целочисленные, булевы примитивные типы, а также два массива атомарных целых чисел.
  • AtomicReference - класс для атомарных операций со ссылкой на объект.
  • AtomicMarkableReference - класс для атомарных операций над парой [reference, boolean].
  • AtomicStampedReference - класс для атомарных операций над парой [reference, int].
  • AtomicReferenceArray - массив атомарных ссылок
  • AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater - классы для атомарного обновления полей по их именам через reflection.
  • DoubleAccumulator, LongAccumulator - классы, представляющие атомарные аккумуляторы, которые принимают на вход чистую функцию-аккумулятор (BinaryOperator) и начальное значение. Сохраняет весь набор операндов, а когда необходимо получить значение, то аккумулирует их с помощью функции-аккумулятора. Порядок операндов и применения функции-аккумулятора не гарантируется. Используется, когда записей намного больше, чем чтения.
  • DoubleAdder, LongAdder - классы, представляющие атомарные счётчики. Являются частным случаем атомарных аккумуляторов, у которых функция-аккумулятор выполняет простое суммирование, а начальным значением является 0.

С помощью атомиков можно реализовать блокировку, например так:

    public class NonReentrantSpinLock {

        private AtomicReference<Thread> owner = new AtomicReference<>();

        public void lock() {
          Thread currentThread = Thread.currentThread();

          while (!owner.compareAndSet(null, currentThread)) {}
        }

        public void unlock() {
          Thread currentThread = Thread.currentThread();
          owner.compareAndSet(currentThread, null);
        }
    }

Подробнее:

к содержанию

Что такое ThreadLocal переменные?

ThreadLocal - класс в виде обёртки для хранения отдельной независимой копии значения переменной для каждого использующего её потока, что позволяет сделать работу с такой переменной потокобезопасной.

Данные ThreadLocal-переменных хранятся не в них самих, а непосредственно в объектах Thread. У каждого экземпляра класса Thread есть поле ThreadLocal.ThreadLocalMap threadLocals, которое инициализируется и используется ThreadLocal. ThreadLocal.ThreadLocalMap представляет собой специализированную версию HashMap, записи которой наследуют от WeakReference<ThreadLocal<?>>, используя ключ мапы как ref field слабой ссылки. Ключами такой мапы являются ThreadLocal, а значением - Object. Если ключ записи равен null, то такая запись называется просроченной (stale) и будет удалена из мапы.

Следует обратить внимание, что ThreadLocal изолирует именно ссылки на объекты, а не копии их значений. Если изолированные внутри потоков ссылки ведут на один и тот же объект, то возможны коллизии.

Когда у ThreadLocal-переменной запрашивается её значение (например через метод get), то она получает текущий поток, извлекает из него мапу threadLocals, и получает значение из мапы, используя себя в качестве ключа. Аналогично выполняются методы изменения значения ThreadLocal.

Из этого следует, что значение ThreadLocal-переменной должно устанавливаться в том же потоке, в котором оно будет использоваться.

Подробнее:

к содержанию

Чем поток отличается от процесса?

Ответ на этот вопрос писали в интернете ещё тогда, когда я балду в школе пинал (нулевые), поэтому у читателя есть широкий выбор. Мне импонирует заметка на OpenNET - кратко и чётко, суше Сахары. Для любителей смотреть в окна есть статейка на MSDN. Вишенка на торте - олдовая статья с Хабра.

к содержанию