Kafka with Schema Registry


3 min read

Kafka doesn't know about type or schema transferred documents. For Kafka cluster all messages are just bytes. But it is good, at least from client side, to know data type of message. And for that we have another component - Schema Registry. In short, Producer send a schema to the Schema Registry and Consumer is fetching this schema. Producer in message add information about schemaId. We will use Schema Registry Docker image provided by Confluent. Code from my previous post will be extended.

Add Schema Registry to the docker-compose.yml

Add this part of code to file where zookeeper and broker is defined.

    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
      - broker
      - "8081:8081"
      SCHEMA_REGISTRY_HOST_NAME: schema-registry

Create avro schema

We will use Apache Avro as a data format for our messages. Add this simple message definition to src/main/avro/msg-schema.avsc

  "type": "record",
  "name": "SMessage",
  "namespace": "com.lda.test.kafka.schema",
  "fields": [
      "name": "idMsg",
      "type": "int"
      "name": "msgBody",
      "type": "string"

Execute build and class SMessage should be generated. Can be used in code from now.

Changes in Java app

Add new dependency to build.gradle

    implementation 'io.confluent:kafka-avro-serializer:5.3.0'
    implementation 'org.apache.avro:avro:1.11.1'

Kafka Config


public class KafkaConfig {

    @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value(value = "${kafka.producer.schemaregistry}")
    private String schemaRegistry;

    public ConsumerFactory<String, SMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, SMessage.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
        return new DefaultKafkaConsumerFactory<>(props);

    public ConcurrentKafkaListenerContainerFactory<String, SMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SMessage> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ProducerFactory<String, SMessage> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "Simple-avro");
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);

        return new DefaultKafkaProducerFactory<>(configProps);

    public KafkaTemplate<String, SMessage> kafkaSMessageTemplate() {
        return new KafkaTemplate<>(producerFactory());

Kafka Consumer

Define message type in KafkaConsumerService

public class KafkaConsumerService {

    @KafkaListener(topics = "quickstart", containerFactory = "kafkaListenerContainerFactory", groupId = "group_id")
    public void listenTopic(SMessage message){
        System.out.println("Received Message from topic: " + message);

Kafka Producer

Define message type in KafkaProducerService

public class KafkaProducerService {

    private KafkaTemplate<String, SMessage> kafkaTemplate;

    public void sendMessage(String topic, String message) {

        SMessage sMessage = SMessage.newBuilder()
                .setIdMsg(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE))

        ListenableFuture<SendResult<String, SMessage>> future =
                kafkaTemplate.send(topic, sMessage);

        future.addCallback(new ListenableFutureCallback<SendResult<String, SMessage>>() {

            public void onSuccess(SendResult<String, SMessage> result) {
                System.out.println("Sent message=[" + message +
                        "] with offset=[" + result.getRecordMetadata().offset() + "]");
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=["
                        + message + "] due to : " + ex.getMessage());

Application propertis

Add schema registry host and port to the application.properties



After implementing changes you should test if everything is working fine. Go to my previous post for description how to do it.


It can be situation that avro serializer is not working with spring boot devtools, so just remove this tool from build.grade.