Gymterview
middle

Что такое Sinks в Project Reactor?

Sinks — программно управляемые издатели данных в Project Reactor, пришедшие на замену deprecated FluxProcessor и MonoProcessor (Reactor 3.4+). Sinks позволяют императивно отправлять данные в реактивный поток.

Аналогия из жизни: Sinks — это как микрофон на сцене. Вы говорите в микрофон (императивно отправляете данные), а все слушатели (подписчики) одновременно слышат вас через динамики (реактивный поток).

Зачем нужны Sinks

Когда источник данных не является «естественно реактивным» (callback-API, WebSocket, очередь событий), Sinks мостят императивный и реактивный миры.

Типы Sinks

Тип Аналог Описание
Sinks.One<T> Mono Эмитирует 0 или 1 элемент
Sinks.Many<T> unicast Flux (1 подписчик) Только один подписчик
Sinks.Many<T> multicast Flux (N подписчиков) Каждый получает элементы с момента подписки
Sinks.Many<T> replay Flux (N подписчиков + история) Новые подписчики получают N последних элементов
Пример
// Sinks.One — аналог Mono
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();
sink.tryEmitValue("result");

// Sinks.Many — multicast
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = sink.asFlux();
sink.tryEmitNext("event1");
sink.tryEmitNext("event2");
sink.tryEmitComplete();

// Sinks.Many — replay (с историей)
Sinks.Many<String> replay = Sinks.many().replay().limit(10);
Практический пример — чат через WebSocket
@Component
public class ChatService {
    private final Sinks.Many<ChatMessage> chatSink =
        Sinks.many().multicast().onBackpressureBuffer();

    public void sendMessage(ChatMessage message) {
        chatSink.tryEmitNext(message);
    }

    public Flux<ChatMessage> getMessages() {
        return chatSink.asFlux();
    }
}

Обработка результата эмиссии

Пример
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

Sinks.EmitResult result = sink.tryEmitNext("data");
if (result.isFailure()) {
    // FAIL_ZERO_SUBSCRIBER, FAIL_OVERFLOW, FAIL_CANCELLED, FAIL_TERMINATED
    log.warn("Не удалось отправить: {}", result);
}

// Или с автоматической обработкой:
sink.emitNext("data", Sinks.EmitFailureHandler.FAIL_FAST);

Частые ошибки

  • Использовать deprecated FluxProcessor — заменён на Sinks с Reactor 3.4
  • Игнорировать EmitResult — без подписчиков или при переполнении данные теряются молча
  • Использовать unicast с несколькими подписчиками — второй получит IllegalStateException
  • Не вызывать tryEmitComplete() — подписчики будут ждать бесконечно

Как используется в 2026

  • Sinks — стандартный способ мостить императивный и реактивный код
  • Широко используется в WebSocket/SSE-обработчиках
  • Для event-driven архитектуры внутри приложения
  • С Virtual Threads потребность снижается, но для broadcast-сценариев Sinks оптимальны

На собеседовании: ключевое — объяснить, зачем нужны Sinks (мост между императивным и реактивным кодом) и назвать три режима Many: unicast, multicast, replay. Частая ошибка — не знать про EmitResult и не проверять результат эмиссии.