Testing Micronaut Kafka

When testing Kafka in Micronaut you can use embedded Kafka or use Testcontainers.

Configuration

To use embedded Kafka set these properties in a src/test/resources/application-kafka.yaml file:

kafka:
bootstrap:
servers: localhost:${random.port}
embedded:
enabled: true
topics:
- topic1
- topic2
kafka:
  bootstrap:
    servers: localhost:${random.port}
  embedded:
    enabled: true
    topics:
    - topic1
    - topic2
kafka: bootstrap: servers: localhost:${random.port} embedded: enabled: true topics: - topic1 - topic2

When you set up a test with @MicronautTest(environments = "kafka") it will set the environment as both “kafka” and “test” which will enable all the properties needed to start the embedded Kafka.

It’s best to start the embedded Kafka on a random port to avoid clashing with another Kafka which may be running locally (such as in docker).

All the topics required by your application also need to be specified under the kafka.embedded.topics key so they’re created in the embedded Kafka.

Producing test messages

To set up a producer to send test messages to your application, create a file in src/test/java/ called TestProducer.java and create an interface inside with the needed annotations:

@KafkaClient
public interface TestProducer {
@Topic("topic1")
void sendTestMessage(String body);
}
@KafkaClient
public interface TestProducer {
    @Topic("topic1")
    void sendTestMessage(String body);
}
@KafkaClient public interface TestProducer { @Topic("topic1") void sendTestMessage(String body); }

Then use the test producer by injecting it into your test:

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {
@Inject
public TestProducer topicClient;
@Test
public void listensToMessages() {
// When
topicClient.sendTestMessage("Hello");
// Then assertions...
// assertThat()...
}
}
@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    @Inject
    public TestProducer topicClient;

    @Test
    public void listensToMessages() {
        // When
        topicClient.sendTestMessage("Hello");
        // Then assertions...
        // assertThat()...
    }
}
@MicronautTest(environments = "kafka") public class TestAKafkaApplication { @Inject public TestProducer topicClient; @Test public void listensToMessages() { // When topicClient.sendTestMessage("Hello"); // Then assertions... // assertThat()... } }

Consuming messages from the application under test

Set up a consumer in your test classes called TestListener with the following code:

@Singleton
@KafkaListener(groupId = "test", clientId = "test-consumer")
public class TestListener {
private BlockingQueue<String> messages = new LinkedBlockingDeque<>();
@Topic("topic1")
public void eventOccurred(String body) {
messages.add(body);
}
public BlockingQueue<String> getMessages() {
return messages;
}
}
@Singleton
@KafkaListener(groupId = "test", clientId = "test-consumer")
public class TestListener {

    private BlockingQueue<String> messages = new LinkedBlockingDeque<>();

    @Topic("topic1")
    public void eventOccurred(String body) {
        messages.add(body);
    }

    public BlockingQueue<String> getMessages() {
        return messages;
    }
}
@Singleton @KafkaListener(groupId = "test", clientId = "test-consumer") public class TestListener { private BlockingQueue<String> messages = new LinkedBlockingDeque<>(); @Topic("topic1") public void eventOccurred(String body) { messages.add(body); } public BlockingQueue<String> getMessages() { return messages; } }

This allows you to get the messages that were sent to the topic and then check the contents. It also copies the approach taken by Spring Cloud Stream of storing the messages in a BlockingQueue so you can easily wait for a message to arrive and then remove it from the data structure when you retrieve it.

Using it in your test requires you to inject it:

import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import technology.wick.blog.consumer.TestListener;
import technology.wick.blog.producer.TestProducer;
import javax.inject.Inject;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {
@Inject
public TestListener topicListener;
@Inject
public TestProducer topicClient;
@BeforeEach
void setUp() {
// Given no messages exist
topicListener.getMessages().clear();
}
@Test
public void producesMessages() throws InterruptedException {
// When
topicClient.sendTestMessage("Hello");
// Then
String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS);
assertThat(bodyOfMessage).isEqualTo("Hello");
}
}
import io.micronaut.test.annotation.MicronautTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import technology.wick.blog.consumer.TestListener;
import technology.wick.blog.producer.TestProducer;

import javax.inject.Inject;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@MicronautTest(environments = "kafka")
public class TestAKafkaApplication {

    @Inject
    public TestListener topicListener;
    @Inject
    public TestProducer topicClient;

    @BeforeEach
    void setUp() {
        // Given no messages exist
        topicListener.getMessages().clear();
    }

    @Test
    public void producesMessages() throws InterruptedException {
        // When
        topicClient.sendTestMessage("Hello");
        // Then
        String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS);
        assertThat(bodyOfMessage).isEqualTo("Hello");
    }
}
import io.micronaut.test.annotation.MicronautTest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import technology.wick.blog.consumer.TestListener; import technology.wick.blog.producer.TestProducer; import javax.inject.Inject; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @MicronautTest(environments = "kafka") public class TestAKafkaApplication { @Inject public TestListener topicListener; @Inject public TestProducer topicClient; @BeforeEach void setUp() { // Given no messages exist topicListener.getMessages().clear(); } @Test public void producesMessages() throws InterruptedException { // When topicClient.sendTestMessage("Hello"); // Then String bodyOfMessage = topicListener.getMessages().poll(2, TimeUnit.SECONDS); assertThat(bodyOfMessage).isEqualTo("Hello"); } }

Conclusion

Micronaut’s approach to testing is simple but functional, there aren’t many test-only features – it requires you to use the same code you would in the real application, so it’s best to approach building a test like you would coding features in the full application.

A working example of using this code in tests can be found on Github.

原文链接:Testing Micronaut Kafka

© 版权声明
THE END
喜欢就支持一下吧
点赞5 分享
The best hearts are always the bravest.
心灵最高尚的人也总是最勇敢的人
评论 抢沙发

请登录后发表评论

    暂无评论内容