Hello everyone,
We are currently playing with AWS’ MSK (Kafka 2.2.1). This is a 3-node cluster, and we can successfully produce events to it.
We are also able to consume events from topics using some basic consumer code (subscribe or assign).
I wanted to start playing with Kafka Streams, but for the life of me, I’m not able to get the darn thing going… it doesn’t seem to want to consume events. I have tried googleing this problem but not a single result is found. All the example code I find is the same and super easy to set up.
I’m wondering if there is some broker config that needs to be specifically setup for streams to work correctly? I have enabled debug logging for Streams and I see messages like “Disconnecting from node -2 due to request timeout.” The broker URL is set correctly (it’s not using localhost). Not really sure what I’m doing wrong. Here is the sample code I’m using
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-stream-5");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, LongSerde.class.getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
KStream<String, String> kstream = builder.stream("stream-test");
kstream.print(Printed.toSysOut());
try (KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
System.out.println("Starting");
streams.start();
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
System.out.println("Sleeping");
}
}
System.out.println("Done");
暂无评论内容