middle
Как выглядит пример конфигурации Kafka Consumer?
Рассмотрим конфигурацию консюмера с разными гарантиями доставки и через разные фреймворки.
Нативный Kafka Consumer
Пример кода
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
public class KafkaConsumerExample {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "my-consumer-group";
String topic = "my-topic";
Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> System.out.println("Received: " + record.value()));
}
} finally {
consumer.close();
}
}
}
Гарантии доставки
| Гарантия | Принцип | Когда коммитить |
|---|---|---|
| At least once | Сообщение обработано минимум один раз (возможны дубли) | После обработки (commitAsync()) |
| At most once | Сообщение обработано максимум один раз (возможна потеря) | До обработки или enable.auto.commit=true |
| Mostly once | Гибрид — обычно один раз, при сбоях возможны дубли | Ручной commit + дедупликация по messageId |
At least once — нативный API
try {
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
process(records);
consumer.commitAsync(); // Commit после обработки
}
} finally {
consumer.close();
}
At most once — нативный API
try {
while (true) {
var records = consumer.poll(Duration.ofSeconds(1));
consumer.commitAsync(); // Commit перед обработкой
process(records);
}
} finally {
consumer.close();
}
С использованием Spring Kafka
Конфигурация и listener
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumerGroupId());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentMessageListenerContainerFactory<String, String> factory = new ConcurrentMessageListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "group_id")
public void listen(@Payload String message,
@Header("traceId") String traceId,
@Header("correlationId") String correlationId) {
System.out.println("Received message: " + message);
System.out.println("Trace ID: " + traceId);
System.out.println("Correlation ID: " + correlationId);
}
}
At least once — Spring Kafka
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
group-id: my-consumer-group
max-poll-records: 500
listener:
ack-mode: manual
@EnableKafka
public class AtLeastOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message, Acknowledgment acknowledgment) {
System.out.println("Received message: " + message);
acknowledgment.acknowledge(); // Ручное подтверждение после обработки
}
}
At most once — Spring Kafka
spring:
kafka:
consumer:
enable-auto-commit: true
group-id: my-consumer-group
auto-offset-reset: earliest
max-poll-records: 100
public class AtMostOnceConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")
public void listen(String message) {
System.out.println("Received message: " + message);
// Смещение автоматически зафиксировано после получения
}
}
С использованием Spring Cloud Stream
Конфигурация и consumer
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
@Service
@EnableBinding(KafkaProcessor.class)
public class KafkaConsumerService {
@StreamListener("input")
public void handle(@Payload String message) {
System.out.println("Received message: " + message);
}
}
public interface KafkaProcessor {
@Input("input")
SubscribableChannel input();
}
Mostly once — Spring Cloud Stream (дедупликация)
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: my-consumer-group
content-type: application/json
consumer:
ackMode: manual
maxAttempts: 3
@Component
@EnableBinding(Sink.class)
public class MostlyOnceConsumer {
private Set<String> processedMessageIds = new HashSet<>();
@StreamListener(Sink.INPUT)
public void handleMessage(Message<String> message, @Header("messageId") String messageId) {
if (processedMessageIds.contains(messageId)) {
System.out.println("Duplicate message: " + messageId);
return;
}
System.out.println("Received message: " + message.getPayload());
processedMessageIds.add(messageId);
}
}
На собеседовании: ключевой вопрос — объяснить разницу между at-least-once и at-most-once. Покажите на уровне кода: at-least-once — commit после обработки (при сбое сообщение будет обработано повторно); at-most-once — commit до обработки (при сбое сообщение потеряется).