Gymterview
senior

Что такое реактивные потоки и java.util.concurrent.Flow API?

Реактивные потоки (Reactive Streams) — это стандарт асинхронной обработки потоков данных с неблокирующим обратным давлением (backpressure). Начиная с Java 9, стандарт включён в JDK через класс java.util.concurrent.Flow.

Основные интерфейсы Flow:

Пример
// Publisher — источник данных
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

// Subscriber — потребитель данных
public interface Subscriber<T> {
    void onSubscribe(Subscription subscription);  // При подписке
    void onNext(T item);                           // Следующий элемент
    void onError(Throwable throwable);             // Ошибка
    void onComplete();                             // Завершение потока
}

// Subscription — связь Publisher и Subscriber
public interface Subscription {
    void request(long n);  // Запрос n элементов (backpressure!)
    void cancel();         // Отмена подписки
}

// Processor — одновременно Publisher и Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Механизм обратного давления (backpressure):

Backpressure решает проблему, когда производитель генерирует данные быстрее потребителя. Потребитель явно запрашивает количество элементов через subscription.request(n), и производитель отправляет не более n элементов.

Код: пример с SubmissionPublisher
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
    publisher.subscribe(new Flow.Subscriber<>() {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1); // Запрашиваем первый элемент
        }

        @Override
        public void onNext(String item) {
            System.out.println("Получено: " + item);
            subscription.request(1); // Запрашиваем следующий
        }

        @Override
        public void onError(Throwable throwable) {
            System.err.println("Ошибка: " + throwable.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("Поток завершён");
        }
    });

    List.of("Один", "Два", "Три").forEach(publisher::submit);
}
Код: Processor для трансформации данных
class TransformProcessor extends SubmissionPublisher<String>
        implements Flow.Processor<String, String> {

    private Flow.Subscription subscription;
    private final Function<String, String> transform;

    TransformProcessor(Function<String, String> transform) {
        this.transform = transform;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(transform.apply(item)); // Трансформируем и передаём дальше
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) { closeExceptionally(throwable); }

    @Override
    public void onComplete() { close(); }
}

Связь с реактивными библиотеками:

Flow API — это только интерфейсы (стандарт совместимости). Для полноценной реактивной разработки используются библиотеки:

Библиотека Основные типы Применение
Project Reactor Mono, Flux Spring WebFlux
RxJava Observable, Flowable Android, backend
Mutiny Uni, Multi Quarkus

Все библиотеки совместимы с Flow API через адаптеры.

Ключевые моменты:

  • SubmissionPublisher — единственная встроенная реализация Publisher в JDK.
  • Backpressure — ключевая концепция: без него быстрый производитель вызовет OutOfMemoryError.
  • onNext() вызывается асинхронно — в потоке из пула SubmissionPublisher.

Частые ошибки:

  • subscription.request(Long.MAX_VALUE) — отключает backpressure, переполнение буферов.
  • Блокирующие операции в onNext() — нарушает контракт реактивных потоков.
  • Забытый subscription.request() — поток «зависнет», onNext() никогда не будет вызван.

Актуальность: реактивный подход остаётся стандартом для стриминга данных (Kafka, WebSocket, SSE). С Virtual Threads часть IO-bound сценариев решается проще через блокирующий код.

Аналогия: реактивные потоки — это конвейерная лента суши. Повар (Publisher) кладёт блюда, гость (Subscriber) берёт по одному. Если гость не успевает — конвейер замедляется (backpressure). Без backpressure блюда падают на пол (OOM).

На собеседовании ключевой вопрос: «Зачем нужен backpressure?» Ответ: без него при несоответствии скоростей Producer и Consumer буферы переполняются. Backpressure позволяет Subscriber контролировать скорость получения данных.