middle
Что такое Scheduler в Project Reactor? Какие типы существуют?
Scheduler — абстракция в Project Reactor, определяющая, в каком потоке (или пуле потоков) будет выполняться реактивный конвейер. По умолчанию цепочка выполняется в потоке, который вызвал subscribe().
Два ключевых оператора
| Оператор | Поведение |
|---|---|
subscribeOn(Scheduler) |
Определяет поток для всей цепочки с начала (upstream). Место вызова не важно |
publishOn(Scheduler) |
Переключает выполнение последующих операторов (downstream). Место вызова важно |
Пример
Flux.range(1, 10)
.map(i -> {
System.out.println("map1: " + Thread.currentThread().getName());
return i * 2;
})
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("map2: " + Thread.currentThread().getName());
return i + 1;
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
// map1 выполнится в boundedElastic, map2 — в parallel
Типы Scheduler
| Scheduler | Пул потоков | Назначение |
|---|---|---|
Schedulers.immediate() |
Текущий поток | Без переключения |
Schedulers.single() |
1 поток | Последовательные задачи с малой нагрузкой |
Schedulers.parallel() |
N потоков (N = CPU cores) | CPU-bound вычисления, без блокировок |
Schedulers.boundedElastic() |
Эластичный пул (до 10xCPU, TTL 60с) | Блокирующие I/O (JDBC, файлы, legacy API) |
Schedulers.fromExecutor(executor) |
Пользовательский Executor | Кастомный пул потоков |
Пример: оборачивание блокирующего вызова
Mono<User> user = Mono.fromCallable(() -> {
// Блокирующий вызов к БД через JDBC
return jdbcTemplate.queryForObject("SELECT * FROM users WHERE id = ?",
userRowMapper, userId);
})
.subscribeOn(Schedulers.boundedElastic()); // выполнить в эластичном пуле
Частые ошибки
- Блокирующие вызовы в
Schedulers.parallel()— приводит к голоданию потоков (пул ограничен числом CPU-ядер) - Многократный
subscribeOn— работает только ближайший к источнику, остальные игнорируются - Отсутствие
subscribeOnпри блокирующем источнике — блокирующий вызов выполнится в event-loop потоке Netty - Создание нового Scheduler на каждый запрос — утечка потоков; используйте
Schedulers.*или кэшируйте
Как используется в 2026
- С Virtual Threads (Java 21) потребность в
boundedElastic()для I/O снижается - Reactor 3.6+ поддерживает
Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()) - Для CPU-bound вычислений
Schedulers.parallel()остаётся актуальным - В полностью реактивных приложениях (R2DBC, WebClient) Scheduler нужен реже
На собеседовании: ключевое — объяснить разницу subscribeOn vs publishOn и назвать типы Scheduler с их назначением. Частая ошибка — не знать, что subscribeOn действует на весь upstream (место вызова не важно), а publishOn — только на downstream (место вызова критично).