junior
Опишите роль Observable и Observer в реактивном программировании
Observable — источник (производитель) данных, Observer — потребитель данных. Их взаимодействие через подписку формирует основу реактивного потока.
Ключевые концепции
| Компонент | Роль |
|---|---|
| Observable (Наблюдаемый) | Передаёт данные или сигналы любых типов |
| Observer (Наблюдатель) | Получает уведомления при отправке данных |
| Subscription (Подписка) | Связующее звено между Observable и Observer |
| Operators (Операторы) | Преобразуют, фильтруют, комбинируют поток данных |
Пример: Java 9+ (Flow API)
Subscriber
import java.util.concurrent.Flow;
public class SimpleSubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // Запрашиваем первый элемент
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
subscription.request(1); // Запрашиваем следующий элемент
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("All items received");
}
}
Publisher
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class SimplePublisher {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
SimpleSubscriber subscriber = new SimpleSubscriber();
publisher.subscribe(subscriber);
System.out.println("Publishing data items...");
String[] items = {"item1", "item2", "item3"};
for (String item : items) {
publisher.submit(item);
TimeUnit.SECONDS.sleep(1);
}
publisher.close();
TimeUnit.SECONDS.sleep(3);
}
}
Пример: Project Reactor
Пример
import reactor.core.publisher.Flux;
public class ReactorExample {
public static void main(String[] args) {
Flux<String> flux = Flux.just("Hello", "World", "From", "Reactor");
flux.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("All items received")
);
}
}
На собеседовании: покажите, что понимаете протокол взаимодействия: Publisher.subscribe -> onSubscribe -> request(n) -> onNext (до n раз) -> onComplete/onError. Частая ошибка — забыть про request(n), без которого данные не будут отправлены.