Gymterview
senior

Что такое CompletionService и когда его применять?

CompletionService<V> — это интерфейс из java.util.concurrent, который отделяет создание асинхронных задач от получения их результатов. Он позволяет получать результаты в порядке их готовности, а не в порядке запуска.

Основная реализация — ExecutorCompletionService.

Проблема без CompletionService:

Пример
List<Future<String>> futures = new ArrayList<>();
for (String url : urls) {
    futures.add(executor.submit(() -> fetch(url)));
}

// Обрабатываем в порядке ЗАПУСКА — если futures.get(0) самая медленная,
// все остальные результаты ждут!
for (Future<String> future : futures) {
    String result = future.get(); // блокируется на КОНКРЕТНОЙ задаче
    process(result);
}

Решение с CompletionService:

Пример
CompletionService<String> cs = new ExecutorCompletionService<>(executor);

for (String url : urls) {
    cs.submit(() -> fetch(url));
}

// Обрабатываем в порядке ЗАВЕРШЕНИЯ — кто первый закончил, того первым обработали
for (int i = 0; i < urls.size(); i++) {
    Future<String> future = cs.take(); // Берём первый завершившийся
    String result = future.get();       // Гарантированно не заблокируется
    process(result);
}

Основные методы:

Метод Описание
submit(Callable<V>) Отправить задачу на выполнение
take() Блокирующее ожидание первой завершённой задачи
poll() Неблокирующее получение (или null)
poll(timeout, unit) Ожидание с таймаутом
Код: получение первого успешного результата
CompletionService<String> cs = new ExecutorCompletionService<>(executor);

cs.submit(() -> fetchFromServerA());
cs.submit(() -> fetchFromServerB());
cs.submit(() -> fetchFromServerC());

for (int i = 0; i < 3; i++) {
    try {
        Future<String> future = cs.take();
        String result = future.get();
        if (result != null) {
            executor.shutdownNow(); // Отменяем оставшиеся
            return result;
        }
    } catch (ExecutionException e) {
        // Эта задача упала — пробуем следующую
    }
}

Ключевые особенности:

  • Решает проблему «head-of-line blocking» — не нужно ждать медленную задачу, чтобы обработать быструю.
  • Внутри использует LinkedBlockingQueue для хранения завершённых Future.
  • Не владеет Executor — закрытие ExecutorService выполняется отдельно.
  • Идеально для scatter-gather: запустить много запросов и обработать ответы по мере поступления.

Частые ошибки:

  • Забыть вызвать take()/poll() для всех задач — утечка памяти.
  • Не вызывать future.get() после take() — ошибки не обнаружатся.
  • Использование для одной задачи — избыточно.

Аналогия: без CompletionService — это получение багажа в аэропорту по номеру очереди: «подождите, пока не выдадут чемодан номер 1, потом номер 2…» С CompletionService — это багажная лента: кто первый приехал, того первым и забрали.

На собеседовании стоит сравнить с CompletableFuture.anyOf(): CompletionService удобнее, когда нужно обработать все результаты по мере готовности, а anyOf — когда нужен только первый. С Virtual Threads часть сценариев заменяется на StructuredTaskScope.