middle
Как выглядит пример конфигурации Kafka Producer?
Рассмотрим три подхода к созданию продюсера: нативный Kafka API, Spring Kafka и Spring Cloud Stream.
Нативный Kafka Producer
Пример кода
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaStringArrayProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String[]> producer = new KafkaProducer<>(props);
String key = "user123";
String[] value = {"message1", "message2", "message3"};
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value);
record.headers().add("traceId", "someTraceId");
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.out.println("Ошибка при отправке: " + exception.getMessage());
} else {
System.out.println("Отправлено в " + metadata.topic() + " партиция " + metadata.partition());
}
});
producer.close();
}
}
acks=all
retries=3
compression.type=gzip
С использованием Spring Kafka
Конфигурация и сервис
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServer());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getProducerId());
props.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.configuration.kafka.KafkaProducerLoggingInterceptor"
);
if ("SASL_SSL".equals(kafkaProperties.getSecurityProtocol())) {
props.put("ssl.truststore.location", kafkaProperties.getSslTrustStoreLocation());
props.put("ssl.truststore.password", kafkaProperties.getSslTrustStorePassword());
props.put("ssl.truststore.type", kafkaProperties.getSslTrustStoreType());
props.put("ssl.keystore.type", kafkaProperties.getSslKeyStoreType());
props.put("sasl.mechanism", kafkaProperties.getSaslMechanism());
props.put("security.protocol", kafkaProperties.getSecurityProtocol());
props.put("sasl.jaas.config", kafkaProperties.getJaasConfigCompiled());
}
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
var stringSerializerKey = new StringSerializer();
stringSerializerKey.configure(Map.of("key.serializer.encoding", "UTF-8"), true);
stringSerializerKey.configure(Map.of("serializer.encoding", "UTF-8"), true);
var stringSerializerValue = new StringSerializer();
stringSerializerValue.configure(Map.of("value.serializer.encoding", "UTF-8"), false);
stringSerializerValue.configure(Map.of("serializer.encoding", "UTF-8"), false);
return new DefaultKafkaProducerFactory<>(producerConfigs(), stringSerializerKey, stringSerializerValue);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message, String key, String topic) {
try {
log.info("Sending message {}", data);
kafkaTemplate.send(topic, key, message);
log.info("Successfully send message {}", data);
} catch (Exception ex) {
log.error("Failed send message to {} topic by key {}", key, topic);
throw ex;
}
}
}
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam String message, @RequestParam String key, @RequestParam String topic) {
kafkaProducerService.sendMessage(message, key, topic);
return "Message sent to Kafka!";
}
}
С использованием Spring Cloud Stream
Конфигурация и сервис
spring:
cloud:
stream:
bindings:
output:
destination: my_topic
kafka:
binder:
brokers: localhost:9092
@Service
@EnableBinding(Source.class)
public class KafkaStreamProducer {
private final Source source;
public KafkaStreamProducer(Source source) {
this.source = source;
}
public void sendMessage(String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
source.output().send(msg);
}
}
@RestController
@RequestMapping("/kafka-stream")
public class KafkaStreamController {
@Autowired
private KafkaStreamProducer kafkaStreamProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaStreamProducer.sendMessage(message);
return "Message sent to Kafka via Spring Cloud Stream!";
}
}
На собеседовании: если вас спрашивают про конфигурацию продюсера, покажите, что знаете как минимум нативный подход и Spring Kafka. Spring Cloud Stream — бонус, который демонстрирует понимание абстракций.