Что такое Sinks в Project Reactor?
Sinks — программно управляемые издатели данных в Project Reactor, пришедшие на замену deprecated FluxProcessor и MonoProcessor (Reactor 3.4+). Sinks позволяют императивно отправлять данные в реактивный поток.
Аналогия из жизни: Sinks — это как микрофон на сцене. Вы говорите в микрофон (императивно отправляете данные), а все слушатели (подписчики) одновременно слышат вас через динамики (реактивный поток).
Зачем нужны Sinks
Когда источник данных не является «естественно реактивным» (callback-API, WebSocket, очередь событий), Sinks мостят императивный и реактивный миры.
Типы Sinks
| Тип | Аналог | Описание |
|---|---|---|
Sinks.One<T> |
Mono | Эмитирует 0 или 1 элемент |
Sinks.Many<T> unicast |
Flux (1 подписчик) | Только один подписчик |
Sinks.Many<T> multicast |
Flux (N подписчиков) | Каждый получает элементы с момента подписки |
Sinks.Many<T> replay |
Flux (N подписчиков + история) | Новые подписчики получают N последних элементов |
Пример
// Sinks.One — аналог Mono
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();
sink.tryEmitValue("result");
// Sinks.Many — multicast
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = sink.asFlux();
sink.tryEmitNext("event1");
sink.tryEmitNext("event2");
sink.tryEmitComplete();
// Sinks.Many — replay (с историей)
Sinks.Many<String> replay = Sinks.many().replay().limit(10);
Практический пример — чат через WebSocket
@Component
public class ChatService {
private final Sinks.Many<ChatMessage> chatSink =
Sinks.many().multicast().onBackpressureBuffer();
public void sendMessage(ChatMessage message) {
chatSink.tryEmitNext(message);
}
public Flux<ChatMessage> getMessages() {
return chatSink.asFlux();
}
}
Обработка результата эмиссии
Пример
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Sinks.EmitResult result = sink.tryEmitNext("data");
if (result.isFailure()) {
// FAIL_ZERO_SUBSCRIBER, FAIL_OVERFLOW, FAIL_CANCELLED, FAIL_TERMINATED
log.warn("Не удалось отправить: {}", result);
}
// Или с автоматической обработкой:
sink.emitNext("data", Sinks.EmitFailureHandler.FAIL_FAST);
Частые ошибки
- Использовать deprecated
FluxProcessor— заменён на Sinks с Reactor 3.4 - Игнорировать
EmitResult— без подписчиков или при переполнении данные теряются молча - Использовать
unicastс несколькими подписчиками — второй получитIllegalStateException - Не вызывать
tryEmitComplete()— подписчики будут ждать бесконечно
Как используется в 2026
- Sinks — стандартный способ мостить императивный и реактивный код
- Широко используется в WebSocket/SSE-обработчиках
- Для event-driven архитектуры внутри приложения
- С Virtual Threads потребность снижается, но для broadcast-сценариев Sinks оптимальны
На собеседовании: ключевое — объяснить, зачем нужны Sinks (мост между императивным и реактивным кодом) и назвать три режима Many: unicast, multicast, replay. Частая ошибка — не знать про EmitResult и не проверять результат эмиссии.