Gymterview
middle

Как организовать асинхронное взаимодействие через Kafka?

Apache Kafka — стандарт для event-driven взаимодействия между сервисами, обеспечивающий надёжную доставку сообщений с гарантией порядка в пределах partition.

Аналогия из жизни: Kafka — это как журнал бухгалтерских проводок. Каждая запись добавляется в конец, никогда не удаляется (до истечения срока хранения), и любой аудитор (consumer) может прочитать историю с любого места.

Kafka vs RabbitMQ

Критерий Apache Kafka RabbitMQ
Модель Лог (append-only, retention) Очередь (удаляется после обработки)
Порядок В пределах partition В пределах очереди
Производительность Миллионы msg/sec Десятки тысяч msg/sec
Replay Можно перечитать историю Нет
Подходит для Event Sourcing, Event Streaming Task queues, RPC, сложная маршрутизация

Producer и Consumer

Пример Producer
@Component
@RequiredArgsConstructor
public class KafkaOrderEventPublisher implements OrderEventPublisher {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Override
    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(), order.getCustomerId(),
            order.getTotalAmount(), Instant.now());

        kafkaTemplate.send("order.events", order.getId().toString(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish event for order {}", order.getId(), ex);
                } else {
                    log.info("Published event for order {} to partition {} offset {}",
                        order.getId(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                }
            });
    }
}
Пример Consumer с ручным подтверждением
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentEventConsumer {

    private final OrderService orderService;

    @KafkaListener(topics = "payment.events", groupId = "order-service")
    public void handlePaymentEvent(
            @Payload PaymentCompletedEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment acknowledgment) {

        log.info("Received PaymentCompletedEvent: orderId={}, partition={}, offset={}",
            event.orderId(), partition, offset);

        try {
            orderService.confirmOrder(event.orderId(), event.paymentId());
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process event for order {}", event.orderId(), e);
            // Не подтверждаем — сообщение будет повторно обработано
        }
    }
}

Transactional Outbox Pattern

Единственный надёжный способ гарантировать атомарность бизнес-операции и публикации события:

Пример
@Transactional
public Order createOrder(CreateOrderCommand command) {
    Order order = Order.create(command);
    orderRepository.save(order);

    // Событие сохраняется в outbox-таблицу в той же транзакции
    OutboxEvent event = OutboxEvent.builder()
        .aggregateType("Order")
        .aggregateId(order.getId().toString())
        .eventType("OrderCreated")
        .payload(objectMapper.writeValueAsString(new OrderCreatedEvent(order)))
        .build();
    outboxRepository.save(event);
    return order;
}

// Отдельный scheduler читает из outbox и отправляет в Kafka
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishOutboxEvents() {
    List<OutboxEvent> events = outboxRepository.findUnpublished(100);
    for (OutboxEvent event : events) {
        kafkaTemplate.send(event.getTopic(), event.getAggregateId(), event.getPayload());
        event.markPublished();
    }
}

Spring Events для внутренней коммуникации

Для событий внутри одного приложения (модульный монолит):

Пример
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void onOrderCreated(OrderCreatedEvent event) {
    notificationService.sendOrderConfirmation(event.orderId());
}

Частые ошибки

  • Обработка без idempotency: consumer может получить одно сообщение несколько раз (at-least-once)
  • Использование auto-commit: сообщение может быть потеряно при падении до обработки
  • Публикация события до коммита транзакции (без Outbox): при откате событие уже отправлено

На собеседовании: три ключевых пункта: 1) Kafka vs RabbitMQ — разные модели (лог vs очередь), 2) Transactional Outbox Pattern для гарантированной доставки, 3) идемпотентность consumer. Если спросят “Как гарантировать exactly-once?” — ответ: exactly-once семантика в Kafka достигается через idempotent producer + transactional consumer, но на практике проще проектировать idempotent обработку.