In several projects, I have encountered difficulties in implementing integration tests for Spring Boot applications using Kafka, and developers are often put off by the effort required to implement tests involving Kafka. This post describes the implementation of a simple integration test using an embedded Kafka broker and the test utility code provided by the spring-kafka-test
dependency, based on a simple example application.
The sample application ingests messages from the not-enriched-user-data
Kafka topic and then enriches them with data from a database. Finally, the enriched messages are published to the enriched-user-data Kafka
topic.
You can find the code of the application here. For the application an integration test consisting of the following steps is implemented.
- publish test data to Kafka topic
not-enriched-user-data
- message is consumed by the Kafka listener
- application enriches message with data
- application sends enriched message to the topic
enriched-user-data
- verify that topic
enriched-user-data
contains message with expected content
Implementation Integration test
Before the test case can be implemented, some code must be written to enable the actual test case implementation.
@SpringBootTest
@EmbeddedKafka(ports = 9092)
class EmbeddedKafkaIntegrationTest {
@Autowired
KafkaTemplate<String, UserData> kafkaTemplate;
@Autowired
ConsumerFactory<String, EnrichedUserData> consumerFactory;
@Autowired
AdditionalUserInformationRepository additionalUserInformationRepository;
@Test
void executeIntegrationTest() {
.....
}
}
Enter fullscreen mode Exit fullscreen mode
With the annotation @SpringBootTest
the Spring Boot application context is made available during the test execution. From the application context the KafkaTemplate
, ConsumerFactory
and AdditionalUserInformationRepository
are injected using the @Autowired
annotation. The annotation @EmbeddedKafka
is used to start an in memory Kafka instance reachable at port 9092
.
The following code shows the actual implementation of the test case.
@Test
void executeIntegrationTest() {
//arrange
final String customerNumber = "customerNumber";
final String userName = "userName";
final String interestingAdditionalInformation = "interesting additional information";
AdditionalUserInformation additionalUserInformation = new AdditionalUserInformation();
additionalUserInformation.setAdditionalInformation(interestingAdditionalInformation);
additionalUserInformation.setCustomerNumber(customerNumber);
additionalUserInformationRepository.save(additionalUserInformation);
Consumer<String, EnrichedUserData> testConsumer = consumerFactory.createConsumer("test", "test");
testConsumer.subscribe(List.of("enriched-user-data"));
//act
kafkaTemplate.send("not-enriched-user-data", new UserData(userName, customerNumber));
//assert
ConsumerRecord<String, EnrichedUserData> receivedRecord = KafkaTestUtils.getSingleRecord(testConsumer, "enriched-user-data");
Assertions.assertAll("",
() -> assertEquals(userName, receivedRecord.value().getUserName()),
() -> assertEquals(customerNumber, receivedRecord.value().getCustomerNumber()),
() -> assertEquals(interestingAdditionalInformation, receivedRecord.value().getEnrichedInfo())
);
}
Enter fullscreen mode Exit fullscreen mode
First, an additionalUserInformation
object is built and saved in the database via the injected additionalUserInformationRepository
. Then the injected consumerFactory
object is used to create the Kafka consumer testConsumer
which subscribes to the enriched-user-data
topic. With the autowired Kafka template object, a message is sent to the not-enriched-user-data
topic.
The send message is automatically processed by the Kafka listener of the application. The getSingleRecord
method from the class KafkaTestUtils
makes the passed consumer testConsumer
poll the topic enriched-user-data
until it receives one record. The retrieved record is used to validate the correct processing of the message.
Conclusion
The combination of functionality provided by KafkaTestUtils
and the embedded Kafka instance allows the implementation of integration tests without a lot of effort caused by involvement of Kafka. A key advantage of using an embedded Kafka instance is that it does not require the pulling of container images. As a result, execution is faster than test implementations using the Testcontainers framework, and the tests do not require changes to the existing CI/CD infrastructure to enable image pulling during test execution.
原文链接:Integration testing with Spring Boot and embedded kafka
暂无评论内容