Kafka: consumindo registros com Spring

O consumo de dados do Kafka, dependendo do caso de uso, requer alguns cuidados no commit. Fazê-lo de forma correta é importante para garantir que nada seja perdido.

Com Clientes Kafka, em especial a versão para Java, é muito simples e intuitivo, mas no Spring Kafka é necessária atenção com ajustes especiais.


O foco neste artigo é mostrar como consumir dados com Spring Kafka, que é uma abstração sobre os Clientes Kafka para Java. Fazendo isso com seguindo setup:

  • enable.auto.commit=false. Que requer commit manual
  • commit síncrono

Devido a semântica de entrega at-least-once, um registro poderá ser consumido uma ou n vezes, e este setup busca reduzir isso.


Um consumidor típico no Spring Kafka é escrito assim:



@Component
public class SpringKafkaListener {
  @KafkaListener(topics = "topico")
  public void consume(String valor) {
    // Processar valor do registro
  }
}


Enter fullscreen mode Exit fullscreen mode

E criado com as seguintes configurações, feitas no application.properties:



spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


Enter fullscreen mode Exit fullscreen mode

Felizmente o Spring Kafka não redefine valores, portanto a configuração padrão é mantida assim como definida na documentação oficial, porém, nela o commit é automático e assíncrono. Contudo, no Spring Kafka, todas as configurações necessárias para commit manual e síncrono não estão disponíveis através de propriedades no application.properties.

Resolvendo

Spring Kafka é uma abstração, logo o poll loop e commit são transparentes. E como pode-se ver no exemplo, um consumidor recebe apenas o registro e por padrão não tem acesso ao Consumer.

Primeiro é necessário revisar as configurações para desligar o commit automático.

Nova configuração:



# Nada de novo aqui spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Desliga o commit automático no Cliente Kafka spring.kafka.consumer.enable-auto-commit=false


Enter fullscreen mode Exit fullscreen mode

Spring tem sua própria notação para a maioria das configurações presentes no Kafka Consumer, que são traduzidas em tempo de execução para o nome correto.

Agora que o commit automático foi desligado, são necessários alguns ajustes programáticos feitos ao customizar as fábricas de objetos:

  • ackMode para MANUAL
  • syncCommits como true


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@EnableKafka
@Configuration
public class KafkaConfig {

  @Autowired
  KafkaProperties properties;

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
              properties.buildConsumerProperties());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
      kafkaListenerContainerFactory() {

      ConcurrentKafkaListenerContainerFactory<String, String> listener = 
            new ConcurrentKafkaListenerContainerFactory<>();

      listener.setConsumerFactory(consumerFactory());

      // Não falhar, caso ainda não existam os tópicos para consumo
      listener.getContainerProperties()
          .setMissingTopicsFatal(false);

      // ### AQUI
      // Commit manual do offset
      listener.getContainerProperties().setAckMode(AckMode.MANUAL);

      // ### AQUI
      // Commits síncronos
      listener.getContainerProperties().setSyncCommits(Boolean.TRUE);

      return listener;
    }
}


Enter fullscreen mode Exit fullscreen mode

Então o consumidor com Spring Kafka terá esta aparência:



import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class SpringKafkaListener {

  @KafkaListener(topics = "topico")
  public void consume(@Payload String valor, Acknowledgment ack) {

    //TODO Processar registro
    // . . . 

    // Commmit manual, que também será síncrono
    ack.acknowledge();

  }
}


Enter fullscreen mode Exit fullscreen mode

Note que mesmo assim não existe acesso ao Consumer, ao invez disso, o Spring injeta uma instância de Acknowledgment que faz o commit quando tem seu método acknowledge() executado.

Também neste exemplo o offset é confirmado a cada registro processado. Isso é algo que degrada a taxa de transferência, mas reduz ainda mais as chances de consumos duplicados. Bem, mas cada caso é um caso .

O exemplo completo está disponível no Github:

fabiojose / skc-ex

Spring Kafka Consumer

Spring Kafka Consumer Example

Exemplo de consumer com Spring Kafka

Requerimentos

Build & Run

Maven

Para montar o fatjar, execute o comando:

Linux

./mvnw clean package

Enter fullscreen mode Exit fullscreen mode

Windows

.\mvnw.cmd clean package

Enter fullscreen mode Exit fullscreen mode

Para executar:

Você pode utilizar o docker-compose.yaml para subir um Kafka em sua máquina

java \
  -Dspring.kafka.bootstrap-servers='localhost:9092' \
  -Dspring.kafka.consumer.client-id='spring-kafka-ex' \
  -Dspring.kafka.consumer.group-id='meu-grupo' \
  -jar target/app-spring-boot.jar

Enter fullscreen mode Exit fullscreen mode

Docker

A definição Dockerfile desta aplicação emprega multi-stage builds. Isso significa que nela acontece o build da aplicação e a criação da imagem.

Se for necessário somente a criar a imagem, pode-se utilizar a definição Dockerfile-image. Mas antes é necessário montar o fatjar através do maven.

Para build do fatjar e montar a imagem, execute o comando:

docker build . -t sk-consumer-ex:1.0

Enter fullscreen mode Exit fullscreen mode

Para montar apenas a imagem (antes…

View on GitHub

Photo by Paweł Czerwiński on Unsplash

原文链接:Kafka: consumindo registros com Spring

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容