Что такое фреймворк Fork/Join?
Fork/Join — это фреймворк из пакета java.util.concurrent, появившийся в JDK 7 (JEP 166). Он предназначен для эффективного выполнения задач, которые можно рекурсивно разбить на независимые подзадачи и затем объединить результаты.
Два этапа:
- Fork (разделение) — большая задача разделяется на подзадачи, каждая из которых может быть разделена далее, пока задача не станет достаточно маленькой для последовательного решения.
- Join (объединение) — результаты подзадач собираются (сворачиваются) обратно в результат исходной задачи.
Ключевые классы:
| Класс | Назначение |
|---|---|
ForkJoinPool |
Пул потоков, исполняющий ForkJoinTask |
ForkJoinTask<V> |
Абстрактная задача для Fork/Join |
RecursiveTask<V> |
Задача, возвращающая результат |
RecursiveAction |
Задача без возвращаемого значения |
Пример — параллельная сумма массива:
Код: параллельная сумма через RecursiveTask
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10_000;
private final long[] array;
private final int from, to;
SumTask(long[] array, int from, int to) {
this.array = array;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
int length = to - from;
if (length <= THRESHOLD) {
// Достаточно маленькая задача — считаем последовательно
long sum = 0;
for (int i = from; i < to; i++) {
sum += array[i];
}
return sum;
}
// Разделяем на две подзадачи
int mid = from + length / 2;
SumTask left = new SumTask(array, from, mid);
SumTask right = new SumTask(array, mid, to);
left.fork(); // Отправляем левую подзадачу в пул
long rightResult = right.compute(); // Правую считаем в текущем потоке
long leftResult = left.join(); // Ждём результат левой
return leftResult + rightResult;
}
}
// Использование
long[] data = new long[1_000_000];
Arrays.fill(data, 1L);
ForkJoinPool pool = new ForkJoinPool(); // или ForkJoinPool.commonPool()
long sum = pool.invoke(new SumTask(data, 0, data.length));
System.out.println("Сумма: " + sum); // 1000000
Алгоритм work-stealing. Каждый поток в ForkJoinPool имеет собственную двустороннюю очередь задач (deque). Когда поток завершает свои задачи, он «крадёт» задачи из конца очереди другого потока. Это обеспечивает автоматическую балансировку нагрузки: ни один поток не простаивает, пока есть незавершённые задачи.
ForkJoinPool.commonPool() — общий пул на всё приложение. Используется по умолчанию для параллельных стримов (Stream.parallel()), CompletableFuture.supplyAsync() и других операций.
Для решения некоторых задач этап Join не требуется. Например, для параллельного QuickSort — массив рекурсивно делится на всё меньшие и меньшие диапазоны, пока не вырождается в тривиальный случай из 1 элемента. Хотя в некотором смысле Join будет необходим и тут, т.к. всё равно остаётся необходимость дождаться, пока не закончится выполнение всех подзадач.
Аналогия: Fork/Join — это организация работы в большом ресторане. Шеф-повар (главная задача) распределяет работу по поварам (подзадачи): один готовит соус, другой — мясо, третий — гарнир. Если повар закончил раньше, он помогает другому (work-stealing). Когда все закончили, блюдо собирается (join).
На собеседовании могут спросить: «Почему важно вызывать
fork()для одной подзадачи, аcompute()для другой, а неfork()для обеих?» Ответ: если сделатьfork()для обеих, текущий поток освободится и будет ждать — это расточительно. Вызываяcompute()для одной из подзадач, мы загружаем текущий поток полезной работой.