Kafka with Schema Registry
Kafka doesn't know about type or schema transferred documents. For Kafka cluster all messages are just bytes. But it is good, at least from client side, to know data type of message. And for that we have another component - Schema Registry. In short, Producer send a schema to the Schema Registry and Consumer is fetching this schema. Producer in message add information about schemaId. We will use Schema Registry Docker image provided by Confluent. Code from my previous post will be extended.
Add Schema Registry to the docker-compose.yml
Add this part of code to file where zookeeper and broker is defined.
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Create avro schema
We will use Apache Avro as a data format for our messages. Add this simple message definition to src/main/avro/msg-schema.avsc
{
"type": "record",
"name": "SMessage",
"namespace": "com.lda.test.kafka.schema",
"fields": [
{
"name": "idMsg",
"type": "int"
},
{
"name": "msgBody",
"type": "string"
}
]
}
Execute build and class SMessage should be generated. Can be used in code from now.
Changes in Java app
Add new dependency to build.gradle
implementation 'io.confluent:kafka-avro-serializer:5.3.0'
implementation 'org.apache.avro:avro:1.11.1'
Kafka Config
config/KafkaConfig.java
@EnableKafka
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Value(value = "${kafka.producer.schemaregistry}")
private String schemaRegistry;
@Bean
public ConsumerFactory<String, SMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, SMessage.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, SMessage> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "Simple-avro");
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SMessage> kafkaSMessageTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Kafka Consumer
Define message type in KafkaConsumerService
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "quickstart", containerFactory = "kafkaListenerContainerFactory", groupId = "group_id")
public void listenTopic(SMessage message){
System.out.println("Received Message from topic: " + message);
}
}
Kafka Producer
Define message type in KafkaProducerService
@Component
public class KafkaProducerService {
@Qualifier("kafkaSMessageTemplate")
@Autowired
private KafkaTemplate<String, SMessage> kafkaTemplate;
public void sendMessage(String topic, String message) {
SMessage sMessage = SMessage.newBuilder()
.setIdMsg(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE))
.setMsgBody(message)
.build();
ListenableFuture<SendResult<String, SMessage>> future =
kafkaTemplate.send(topic, sMessage);
future.addCallback(new ListenableFutureCallback<SendResult<String, SMessage>>() {
@Override
public void onSuccess(SendResult<String, SMessage> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
}
Application propertis
Add schema registry host and port to the application.properties
kafka.producer.schemaregistry=http://localhost:8081
Testing
After implementing changes you should test if everything is working fine. Go to my previous post for description how to do it.
Remarks
It can be situation that avro serializer is not working with spring boot devtools, so just remove this tool from build.grade.