Gymterview
middle

Как организовать асинхронное взаимодействие через брокеры сообщений?

Асинхронное взаимодействие позволяет сервисам обмениваться данными без ожидания немедленного ответа. Два основных брокера: Apache Kafka (распределённый лог событий) и RabbitMQ (классическая очередь сообщений).

Apache Kafka: продюсер и консьюмер
// Продюсер (сервис платежей)
@Service
@RequiredArgsConstructor
public class PaymentEventProducer {
    private final KafkaTemplate<String, PaymentEvent> kafkaTemplate;

    public void publishPaymentCompleted(Payment payment) {
        PaymentEvent event = new PaymentEvent(
            payment.getId(),
            payment.getAmount(),
            payment.getCustomerId(),
            EventType.PAYMENT_COMPLETED,
            Instant.now()
        );
        kafkaTemplate.send("payment-events",
            payment.getId().toString(), event);
    }
}

// Конфигурация продюсера
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, PaymentEvent> producerFactory() {
        Map<String, Object> config = Map.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
            ProducerConfig.ACKS_CONFIG, "all",
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true
        );
        return new DefaultKafkaProducerFactory<>(config);
    }
}

// Консьюмер (сервис уведомлений)
@Service
@Slf4j
public class NotificationEventConsumer {

    @KafkaListener(
        topics = "payment-events",
        groupId = "notification-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handlePaymentEvent(PaymentEvent event) {
        log.info("Получено событие платежа: {}", event.getPaymentId());
        notificationService.sendPaymentNotification(event);
    }
}
RabbitMQ: продюсер, консьюмер и конфигурация
// Продюсер
@Service
@RequiredArgsConstructor
public class NotificationProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendNotification(NotificationMessage message) {
        rabbitTemplate.convertAndSend(
            "notifications-exchange",
            "notification.email",
            message
        );
    }
}

// Консьюмер
@Service
public class EmailNotificationConsumer {

    @RabbitListener(queues = "email-notifications-queue")
    public void handleEmailNotification(NotificationMessage message) {
        emailService.send(message);
    }
}

// Конфигурация
@Configuration
public class RabbitConfig {
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("notifications-exchange");
    }

    @Bean
    public Queue emailQueue() {
        return QueueBuilder.durable("email-notifications-queue")
            .withArgument("x-dead-letter-exchange", "dlx-exchange")
            .build();
    }

    @Bean
    public Binding binding(Queue emailQueue, TopicExchange exchange) {
        return BindingBuilder.bind(emailQueue)
            .to(exchange).with("notification.email");
    }
}

Kafka vs RabbitMQ

Критерий Kafka RabbitMQ
Модель Распределённый лог Очередь сообщений
Хранение Сообщения хранятся после прочтения Удаляются после подтверждения
Пропускная способность Очень высокая (миллионы msg/sec) Высокая (десятки тысяч msg/sec)
Порядок Гарантирован в рамках партиции Гарантирован в рамках очереди
Маршрутизация По топикам и партициям Гибкая (direct, topic, fanout, headers)
Применение Event streaming, event sourcing Task queues, RPC, маршрутизация

На собеседовании: ключевое различие — Kafka хранит сообщения после прочтения (лог), а RabbitMQ удаляет после подтверждения (очередь). Kafka — для event-driven архитектуры и event sourcing, RabbitMQ — для task queues и гибкой маршрутизации.