middle
Как организовать асинхронное взаимодействие через брокеры сообщений?
Асинхронное взаимодействие позволяет сервисам обмениваться данными без ожидания немедленного ответа. Два основных брокера: Apache Kafka (распределённый лог событий) и RabbitMQ (классическая очередь сообщений).
Apache Kafka: продюсер и консьюмер
// Продюсер (сервис платежей)
@Service
@RequiredArgsConstructor
public class PaymentEventProducer {
private final KafkaTemplate<String, PaymentEvent> kafkaTemplate;
public void publishPaymentCompleted(Payment payment) {
PaymentEvent event = new PaymentEvent(
payment.getId(),
payment.getAmount(),
payment.getCustomerId(),
EventType.PAYMENT_COMPLETED,
Instant.now()
);
kafkaTemplate.send("payment-events",
payment.getId().toString(), event);
}
}
// Конфигурация продюсера
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, PaymentEvent> producerFactory() {
Map<String, Object> config = Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true
);
return new DefaultKafkaProducerFactory<>(config);
}
}
// Консьюмер (сервис уведомлений)
@Service
@Slf4j
public class NotificationEventConsumer {
@KafkaListener(
topics = "payment-events",
groupId = "notification-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handlePaymentEvent(PaymentEvent event) {
log.info("Получено событие платежа: {}", event.getPaymentId());
notificationService.sendPaymentNotification(event);
}
}
RabbitMQ: продюсер, консьюмер и конфигурация
// Продюсер
@Service
@RequiredArgsConstructor
public class NotificationProducer {
private final RabbitTemplate rabbitTemplate;
public void sendNotification(NotificationMessage message) {
rabbitTemplate.convertAndSend(
"notifications-exchange",
"notification.email",
message
);
}
}
// Консьюмер
@Service
public class EmailNotificationConsumer {
@RabbitListener(queues = "email-notifications-queue")
public void handleEmailNotification(NotificationMessage message) {
emailService.send(message);
}
}
// Конфигурация
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange exchange() {
return new TopicExchange("notifications-exchange");
}
@Bean
public Queue emailQueue() {
return QueueBuilder.durable("email-notifications-queue")
.withArgument("x-dead-letter-exchange", "dlx-exchange")
.build();
}
@Bean
public Binding binding(Queue emailQueue, TopicExchange exchange) {
return BindingBuilder.bind(emailQueue)
.to(exchange).with("notification.email");
}
}
Kafka vs RabbitMQ
| Критерий | Kafka | RabbitMQ |
|---|---|---|
| Модель | Распределённый лог | Очередь сообщений |
| Хранение | Сообщения хранятся после прочтения | Удаляются после подтверждения |
| Пропускная способность | Очень высокая (миллионы msg/sec) | Высокая (десятки тысяч msg/sec) |
| Порядок | Гарантирован в рамках партиции | Гарантирован в рамках очереди |
| Маршрутизация | По топикам и партициям | Гибкая (direct, topic, fanout, headers) |
| Применение | Event streaming, event sourcing | Task queues, RPC, маршрутизация |
На собеседовании: ключевое различие — Kafka хранит сообщения после прочтения (лог), а RabbitMQ удаляет после подтверждения (очередь). Kafka — для event-driven архитектуры и event sourcing, RabbitMQ — для task queues и гибкой маршрутизации.