middle
Что такое backpressure в контексте реактивного программирования?
Backpressure — механизм управления потоком данных, когда производитель (Publisher) генерирует данные быстрее, чем потребитель (Subscriber) может их обработать.
Аналогия из жизни: backpressure — это как кран с водой и стакан. Если лить быстрее, чем стакан наполняется и опустошается, вода переливается. Backpressure позволяет потребителю регулировать напор крана.
Почему важно backpressure
Без управления backpressure возникают:
- Memory Overflow — необработанные элементы накапливаются в памяти
- Latency Increase — растёт очередь необработанных элементов
- Resource Exhaustion — исчерпание CPU и памяти, сбои системы
Стратегии управления backpressure в Project Reactor
| Стратегия | Оператор | Описание |
|---|---|---|
| Буферизация | onBackpressureBuffer(100) |
Элементы хранятся в буфере до обработки |
| Отбрасывание | onBackpressureDrop() |
Лишние элементы отбрасываются |
| Новейшие | onBackpressureLatest() |
Сохраняется только последний элемент |
| Ошибка | onBackpressureError() |
Сигнал об ошибке при переполнении |
| Контроль запроса | request(n) |
Явный контроль скорости запроса элементов |
Пример: буферизация
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class BackpressureExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 1000)
.onBackpressureBuffer(100);
flux.publishOn(Schedulers.boundedElastic())
.subscribe(
item -> {
try {
Thread.sleep(10); // Имитируем медленного потребителя
System.out.println("Received: " + item);
} catch (InterruptedException e) {
e.printStackTrace();
}
},
error -> System.err.println("Error: " + error),
() -> System.out.println("All items received")
);
}
}
Пример: контроль скорости запроса через BaseSubscriber
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
public class ControlledRequestExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(1, 1000);
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // Запрашиваем один элемент при подписке
}
@Override
protected void hookOnNext(Integer value) {
try {
Thread.sleep(10);
System.out.println("Received: " + value);
request(1); // Запрашиваем следующий элемент после обработки
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void hookOnComplete() {
System.out.println("All items received");
}
@Override
protected void hookOnError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
});
}
}
На собеседовании: интервьюер хочет услышать, что backpressure — это не просто буфер, а протокол: потребитель через request(n) сообщает производителю, сколько данных он готов принять. Частая ошибка — назвать только одну стратегию (обычно буферизацию), не упомянув drop, latest, error и явный request(n).