Что такое реактивные потоки и java.util.concurrent.Flow API?
Реактивные потоки (Reactive Streams) — это стандарт асинхронной обработки потоков данных с неблокирующим обратным давлением (backpressure). Начиная с Java 9, стандарт включён в JDK через класс java.util.concurrent.Flow.
Основные интерфейсы Flow:
Пример
// Publisher — источник данных
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
// Subscriber — потребитель данных
public interface Subscriber<T> {
void onSubscribe(Subscription subscription); // При подписке
void onNext(T item); // Следующий элемент
void onError(Throwable throwable); // Ошибка
void onComplete(); // Завершение потока
}
// Subscription — связь Publisher и Subscriber
public interface Subscription {
void request(long n); // Запрос n элементов (backpressure!)
void cancel(); // Отмена подписки
}
// Processor — одновременно Publisher и Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Механизм обратного давления (backpressure):
Backpressure решает проблему, когда производитель генерирует данные быстрее потребителя. Потребитель явно запрашивает количество элементов через subscription.request(n), и производитель отправляет не более n элементов.
Код: пример с SubmissionPublisher
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
publisher.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Запрашиваем первый элемент
}
@Override
public void onNext(String item) {
System.out.println("Получено: " + item);
subscription.request(1); // Запрашиваем следующий
}
@Override
public void onError(Throwable throwable) {
System.err.println("Ошибка: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Поток завершён");
}
});
List.of("Один", "Два", "Три").forEach(publisher::submit);
}
Код: Processor для трансформации данных
class TransformProcessor extends SubmissionPublisher<String>
implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
private final Function<String, String> transform;
TransformProcessor(Function<String, String> transform) {
this.transform = transform;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
submit(transform.apply(item)); // Трансформируем и передаём дальше
subscription.request(1);
}
@Override
public void onError(Throwable throwable) { closeExceptionally(throwable); }
@Override
public void onComplete() { close(); }
}
Связь с реактивными библиотеками:
Flow API — это только интерфейсы (стандарт совместимости). Для полноценной реактивной разработки используются библиотеки:
| Библиотека | Основные типы | Применение |
|---|---|---|
| Project Reactor | Mono, Flux |
Spring WebFlux |
| RxJava | Observable, Flowable |
Android, backend |
| Mutiny | Uni, Multi |
Quarkus |
Все библиотеки совместимы с Flow API через адаптеры.
Ключевые моменты:
SubmissionPublisher— единственная встроенная реализацияPublisherв JDK.- Backpressure — ключевая концепция: без него быстрый производитель вызовет
OutOfMemoryError. onNext()вызывается асинхронно — в потоке из пулаSubmissionPublisher.
Частые ошибки:
subscription.request(Long.MAX_VALUE)— отключает backpressure, переполнение буферов.- Блокирующие операции в
onNext()— нарушает контракт реактивных потоков. - Забытый
subscription.request()— поток «зависнет»,onNext()никогда не будет вызван.
Актуальность: реактивный подход остаётся стандартом для стриминга данных (Kafka, WebSocket, SSE). С Virtual Threads часть IO-bound сценариев решается проще через блокирующий код.
Аналогия: реактивные потоки — это конвейерная лента суши. Повар (Publisher) кладёт блюда, гость (Subscriber) берёт по одному. Если гость не успевает — конвейер замедляется (backpressure). Без backpressure блюда падают на пол (OOM).
На собеседовании ключевой вопрос: «Зачем нужен backpressure?» Ответ: без него при несоответствии скоростей Producer и Consumer буферы переполняются. Backpressure позволяет Subscriber контролировать скорость получения данных.