Gymterview
middle

Что такое Scheduler в Project Reactor? Какие типы существуют?

Scheduler — абстракция в Project Reactor, определяющая, в каком потоке (или пуле потоков) будет выполняться реактивный конвейер. По умолчанию цепочка выполняется в потоке, который вызвал subscribe().

Два ключевых оператора

Оператор Поведение
subscribeOn(Scheduler) Определяет поток для всей цепочки с начала (upstream). Место вызова не важно
publishOn(Scheduler) Переключает выполнение последующих операторов (downstream). Место вызова важно
Пример
Flux.range(1, 10)
    .map(i -> {
        System.out.println("map1: " + Thread.currentThread().getName());
        return i * 2;
    })
    .publishOn(Schedulers.parallel())
    .map(i -> {
        System.out.println("map2: " + Thread.currentThread().getName());
        return i + 1;
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe();
// map1 выполнится в boundedElastic, map2 — в parallel

Типы Scheduler

Scheduler Пул потоков Назначение
Schedulers.immediate() Текущий поток Без переключения
Schedulers.single() 1 поток Последовательные задачи с малой нагрузкой
Schedulers.parallel() N потоков (N = CPU cores) CPU-bound вычисления, без блокировок
Schedulers.boundedElastic() Эластичный пул (до 10xCPU, TTL 60с) Блокирующие I/O (JDBC, файлы, legacy API)
Schedulers.fromExecutor(executor) Пользовательский Executor Кастомный пул потоков
Пример: оборачивание блокирующего вызова
Mono<User> user = Mono.fromCallable(() -> {
        // Блокирующий вызов к БД через JDBC
        return jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?",
                                            userRowMapper, userId);
    })
    .subscribeOn(Schedulers.boundedElastic()); // выполнить в эластичном пуле

Частые ошибки

  • Блокирующие вызовы в Schedulers.parallel() — приводит к голоданию потоков (пул ограничен числом CPU-ядер)
  • Многократный subscribeOn — работает только ближайший к источнику, остальные игнорируются
  • Отсутствие subscribeOn при блокирующем источнике — блокирующий вызов выполнится в event-loop потоке Netty
  • Создание нового Scheduler на каждый запрос — утечка потоков; используйте Schedulers.* или кэшируйте

Как используется в 2026

  • С Virtual Threads (Java 21) потребность в boundedElastic() для I/O снижается
  • Reactor 3.6+ поддерживает Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor())
  • Для CPU-bound вычислений Schedulers.parallel() остаётся актуальным
  • В полностью реактивных приложениях (R2DBC, WebClient) Scheduler нужен реже

На собеседовании: ключевое — объяснить разницу subscribeOn vs publishOn и назвать типы Scheduler с их назначением. Частая ошибка — не знать, что subscribeOn действует на весь upstream (место вызова не важно), а publishOn — только на downstream (место вызова критично).