Gymterview
middle

Как выглядит пример конфигурации Kafka Consumer?

Рассмотрим конфигурацию консюмера с разными гарантиями доставки и через разные фреймворки.

Нативный Kafka Consumer

Пример кода
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092";
        String groupId = "my-consumer-group";
        String topic = "my-topic";

        Map<String, Object> consumerConfigs = new HashMap<>();
        consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs);
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                var records = consumer.poll(Duration.ofSeconds(1));
                records.forEach(record -> System.out.println("Received: " + record.value()));
            }
        } finally {
            consumer.close();
        }
    }
}

Гарантии доставки

Гарантия Принцип Когда коммитить
At least once Сообщение обработано минимум один раз (возможны дубли) После обработки (commitAsync())
At most once Сообщение обработано максимум один раз (возможна потеря) До обработки или enable.auto.commit=true
Mostly once Гибрид — обычно один раз, при сбоях возможны дубли Ручной commit + дедупликация по messageId
At least once — нативный API
try {
    while (true) {
        var records = consumer.poll(Duration.ofSeconds(1));
        process(records);
        consumer.commitAsync(); // Commit после обработки
    }
} finally {
    consumer.close();
}
At most once — нативный API
try {
    while (true) {
        var records = consumer.poll(Duration.ofSeconds(1));
        consumer.commitAsync(); // Commit перед обработкой
        process(records);
    }
} finally {
    consumer.close();
}

С использованием Spring Kafka

Конфигурация и listener
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId());
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return configs;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentMessageListenerContainerFactory<String, String> factory = new ConcurrentMessageListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
@Service
public class KafkaConsumer {

    @KafkaListener(topics = "my_topic", groupId = "group_id")
    public void listen(@Payload String message,
                       @Header("traceId") String traceId,
                       @Header("correlationId") String correlationId) {
        System.out.println("Received message: " + message);
        System.out.println("Trace ID: " + traceId);
        System.out.println("Correlation ID: " + correlationId);
    }
}
At least once — Spring Kafka
spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      group-id: my-consumer-group
      max-poll-records: 500
    listener:
      ack-mode: manual
@EnableKafka
public class AtLeastOnceConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
    public void listen(String message, Acknowledgment acknowledgment) {
        System.out.println("Received message: " + message);
        acknowledgment.acknowledge(); // Ручное подтверждение после обработки
    }
}
At most once — Spring Kafka
spring:
  kafka:
    consumer:
      enable-auto-commit: true
      group-id: my-consumer-group
      auto-offset-reset: earliest
      max-poll-records: 100
public class AtMostOnceConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
        // Смещение автоматически зафиксировано после получения
    }
}

С использованием Spring Cloud Stream

Конфигурация и consumer
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-consumer-group
          content-type: application/json
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: false
@Service
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerService {

    @StreamListener("input")
    public void handle(@Payload String message) {
        System.out.println("Received message: " + message);
    }
}
public interface KafkaProcessor {

    @Input("input")
    SubscribableChannel input();
}
Mostly once — Spring Cloud Stream (дедупликация)
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: my-consumer-group
          content-type: application/json
          consumer:
            ackMode: manual
            maxAttempts: 3
@Component
@EnableBinding(Sink.class)
public class MostlyOnceConsumer {

    private Set<String> processedMessageIds = new HashSet<>();

    @StreamListener(Sink.INPUT)
    public void handleMessage(Message<String> message, @Header("messageId") String messageId) {
        if (processedMessageIds.contains(messageId)) {
            System.out.println("Duplicate message: " + messageId);
            return;
        }
        System.out.println("Received message: " + message.getPayload());
        processedMessageIds.add(messageId);
    }
}

На собеседовании: ключевой вопрос — объяснить разницу между at-least-once и at-most-once. Покажите на уровне кода: at-least-once — commit после обработки (при сбое сообщение будет обработано повторно); at-most-once — commit до обработки (при сбое сообщение потеряется).