O consumo de dados do Kafka, dependendo do caso de uso, requer alguns cuidados no commit. Fazê-lo de forma correta é importante para garantir que nada seja perdido.
Com Clientes Kafka, em especial a versão para Java, é muito simples e intuitivo, mas no Spring Kafka é necessária atenção com ajustes especiais.
O foco neste artigo é mostrar como consumir dados com Spring Kafka, que é uma abstração sobre os Clientes Kafka para Java. Fazendo isso com seguindo setup:
-
enable.auto.commit=false
. Que requer commit manual - commit síncrono
Devido a semântica de entrega at-least-once, um registro poderá ser consumido uma ou
n
vezes, e este setup busca reduzir isso.
Um consumidor típico no Spring Kafka é escrito assim:
@Component
public class SpringKafkaListener {
@KafkaListener(topics = "topico")
public void consume(String valor) {
// Processar valor do registro
}
}
Enter fullscreen mode Exit fullscreen mode
E criado com as seguintes configurações, feitas no application.properties
:
spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
Enter fullscreen mode Exit fullscreen mode
Felizmente o Spring Kafka não redefine valores, portanto a configuração padrão é mantida assim como definida na documentação oficial, porém, nela o commit é automático e assíncrono. Contudo, no Spring Kafka, todas as configurações necessárias para commit manual e síncrono não estão disponíveis através de propriedades no application.properties
.
Resolvendo
Spring Kafka é uma abstração, logo o poll loop e commit são transparentes. E como pode-se ver no exemplo, um consumidor recebe apenas o registro e por padrão não tem acesso ao Consumer.
Primeiro é necessário revisar as configurações para desligar o commit automático.
Nova configuração:
# Nada de novo aqui spring.kafka.bootstrap-servers=configure-me_kafka-broker:9092
spring.kafka.consumer.client-id=configure-me_client-id
spring.kafka.consumer.group-id=configure-me_group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Desliga o commit automático no Cliente Kafka spring.kafka.consumer.enable-auto-commit=false
Enter fullscreen mode Exit fullscreen mode
Spring tem sua própria notação para a maioria das configurações presentes no Kafka Consumer, que são traduzidas em tempo de execução para o nome correto.
Agora que o commit automático foi desligado, são necessários alguns ajustes programáticos feitos ao customizar as fábricas de objetos:
- ackMode para
MANUAL
- syncCommits como
true
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@EnableKafka
@Configuration
public class KafkaConfig {
@Autowired
KafkaProperties properties;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> listener =
new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(consumerFactory());
// Não falhar, caso ainda não existam os tópicos para consumo
listener.getContainerProperties()
.setMissingTopicsFatal(false);
// ### AQUI
// Commit manual do offset
listener.getContainerProperties().setAckMode(AckMode.MANUAL);
// ### AQUI
// Commits síncronos
listener.getContainerProperties().setSyncCommits(Boolean.TRUE);
return listener;
}
}
Enter fullscreen mode Exit fullscreen mode
Então o consumidor com Spring Kafka terá esta aparência:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class SpringKafkaListener {
@KafkaListener(topics = "topico")
public void consume(@Payload String valor, Acknowledgment ack) {
//TODO Processar registro
// . . .
// Commmit manual, que também será síncrono
ack.acknowledge();
}
}
Enter fullscreen mode Exit fullscreen mode
Note que mesmo assim não existe acesso ao Consumer, ao invez disso, o Spring injeta uma instância de Acknowledgment
que faz o commit quando tem seu método acknowledge()
executado.
Também neste exemplo o offset é confirmado a cada registro processado. Isso é algo que degrada a taxa de transferência, mas reduz ainda mais as chances de consumos duplicados. Bem, mas cada caso é um caso .
O exemplo completo está disponível no Github:
fabiojose / skc-ex
Spring Kafka Consumer
Spring Kafka Consumer Example
Exemplo de consumer com Spring Kafka
Requerimentos
- JDK 1.8
- Acesso ao repositório https://repo.maven.apache.org/maven2/ ou uma alternativa com acesso às dependências presentes no
pom.xml
Build & Run
Maven
Para montar o fatjar, execute o comando:
Linux
./mvnw clean package
Enter fullscreen mode Exit fullscreen mode
Windows
.\mvnw.cmd clean package
Enter fullscreen mode Exit fullscreen mode
Para executar:
Você pode utilizar o
docker-compose.yaml
para subir um Kafka em sua máquina
java \ -Dspring.kafka.bootstrap-servers='localhost:9092' \ -Dspring.kafka.consumer.client-id='spring-kafka-ex' \ -Dspring.kafka.consumer.group-id='meu-grupo' \ -jar target/app-spring-boot.jar
Enter fullscreen mode Exit fullscreen mode
Docker
A definição Dockerfile desta aplicação emprega multi-stage builds. Isso significa que nela acontece o build da aplicação e a criação da imagem.
Se for necessário somente a criar a imagem, pode-se utilizar a definição Dockerfile-image. Mas antes é necessário montar o fatjar através do maven.
Para build do fatjar e montar a imagem, execute o comando:
docker build . -t sk-consumer-ex:1.0
Enter fullscreen mode Exit fullscreen mode
Para montar apenas a imagem (antes…
Photo by Paweł Czerwiński on Unsplash
暂无评论内容