Kafka with Schema Registry

·

3 min read

Kafka doesn't know about type or schema transferred documents. For Kafka cluster all messages are just bytes. But it is good, at least from client side, to know data type of message. And for that we have another component - Schema Registry. In short, Producer send a schema to the Schema Registry and Consumer is fetching this schema. Producer in message add information about schemaId. We will use Schema Registry Docker image provided by Confluent. Code from my previous post will be extended.

Add Schema Registry to the docker-compose.yml

Add this part of code to file where zookeeper and broker is defined.

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Create avro schema

We will use Apache Avro as a data format for our messages. Add this simple message definition to src/main/avro/msg-schema.avsc

{
  "type": "record",
  "name": "SMessage",
  "namespace": "com.lda.test.kafka.schema",
  "fields": [
    {
      "name": "idMsg",
      "type": "int"
    },
    {
      "name": "msgBody",
      "type": "string"
    }
  ]
}

Execute build and class SMessage should be generated. Can be used in code from now.

Changes in Java app

Add new dependency to build.gradle

    implementation 'io.confluent:kafka-avro-serializer:5.3.0'
    implementation 'org.apache.avro:avro:1.11.1'

Kafka Config

config/KafkaConfig.java

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value(value = "${kafka.producer.schemaregistry}")
    private String schemaRegistry;

    @Bean
    public ConsumerFactory<String, SMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, SMessage.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SMessage> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, SMessage> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "Simple-avro");
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, SMessage> kafkaSMessageTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Kafka Consumer

Define message type in KafkaConsumerService

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "quickstart", containerFactory = "kafkaListenerContainerFactory", groupId = "group_id")
    public void listenTopic(SMessage message){
        System.out.println("Received Message from topic: " + message);
    }
}

Kafka Producer

Define message type in KafkaProducerService

@Component
public class KafkaProducerService {

    @Qualifier("kafkaSMessageTemplate")
    @Autowired
    private KafkaTemplate<String, SMessage> kafkaTemplate;

    public void sendMessage(String topic, String message) {

        SMessage sMessage = SMessage.newBuilder()
                .setIdMsg(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE))
                .setMsgBody(message)
                .build();

        ListenableFuture<SendResult<String, SMessage>> future =
                kafkaTemplate.send(topic, sMessage);

        future.addCallback(new ListenableFutureCallback<SendResult<String, SMessage>>() {

            @Override
            public void onSuccess(SendResult<String, SMessage> result) {
                System.out.println("Sent message=[" + message +
                        "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=["
                        + message + "] due to : " + ex.getMessage());
            }
        });
    }
}

Application propertis

Add schema registry host and port to the application.properties

kafka.producer.schemaregistry=http://localhost:8081

Testing

After implementing changes you should test if everything is working fine. Go to my previous post for description how to do it.

Remarks

It can be situation that avro serializer is not working with spring boot devtools, so just remove this tool from build.grade.