As we already discussed in the past, asynchronous programming brings many pros in developing reactive and responsive applications. However, it also carries cons and challenges, one of the main ones is the backpressure problem.
What is backpressure?
In physics
it is a resistance or force opposing the desired flow of fluid through pipes (wikipedia)
We can translate the problem on a known scenario: persistence of messages polled from a bus, where there are a huge amount of messages on a bus that our application is polling really fast, but the database underneath is really slow.
How can be funnel overflow be avoided?
In a synchronous scenario, there’s no backpressure’s issue, the sync nature of the computation blocks the polling from the bus until the current message is processed.
But, in the async world, the polling is executed without clues on what’s happening on the database. So if the database can’t handle all the messages coming from the bus, the messages will remain “in between”, which means in the memory of our service.
This can lead to failures or, at worst, to service fault.
Let’s try to develop an application that persists messages in a database, and make it evolve to handle backpressure
Automatic polling
At first, our verticle will do those operations:
- initialize the JDBC client
- initialize the Kafka client
- subscribe to the topic
- persist the records
The code is quite simple, and it works well with small amounts of messages.
When the load gets bigger and bigger a problem appears: using the handler of Vertx Kafka consumer means that there’s no control on the message ratio, so it will poll continuously without considering the persistance rate, causing memory overload.
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise)
.handler(record -> {
persist(jdbc, record)
.onSuccess(result -> System.out.println("Message persisted"))
.onFailure(cause -> System.err.println("Message not persisted " + cause));
});
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
Promise<UpdateResult> promise = Promise.promise();
JsonArray params = toParams(record);
jdbc.updateWithParams("insert or update query to persist record", params, promise);
return promise.future();
}
private JsonObject datasourceConfiguration() {
// TODO datasource configuration
return null;
}
private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
// TODO: convert the record into params for the sql command
return null;
}
}
Enter fullscreen mode Exit fullscreen mode
Explicit polling
To handle the backpressure, explicit polling shall be used, and this can be done by avoiding the kafka consumer’s handler setting and by calling poll
manually (in the following case, every 100ms).
By using this approach, it can be made so that every poll gets performed only when the batch of previously polled messages are persisted.
This behaviour can be achieved by handling every message’s persist
future and collecting all of them with the CompositeFuture.all
, that will succeed only when all the messages are completed, and only in this case the next polling can be made.
If at least one of the future fails, everything will fail, and the polling will stop.
There are various solutions that can be adopted to make the service handle the failure, e.g. sending the message to a Dead Letter Queue, but we will not cover this case.
The problem with this code is that if a message fails, we will lose it, because the consumer is set to auto-commit, so, it’s vertx that commits the topic offset.
Enter fullscreen mode Exit fullscreen mode
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
Enter fullscreen mode
Exit fullscreen mode
}
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span> <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">List</span><span class="o"><</span><span class="nc">Future</span><span class="o"><</span><span class="nc">UpdateResult</span><span class="o">>></span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span> <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span> <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span> <span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o"><>(</span><span class="n">futures</span><span class="o">));</span> <span class="o">})</span> <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted"</span><span class="o">);</span> <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span> <span class="o">})</span> <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span> <span class="o">;</span>
Enter fullscreen mode
Exit fullscreen mode
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}...
}
Manual commit
Setting the
ENABLE_AUTO_COMMIT_CONFIG
properties tofalse
, the service takes ownership of the topic offset commit.
The commit will be performed only when every message will be persisted, with this trick theat least once
delivery is achieved.Enter fullscreen mode Exit fullscreen mode
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
Enter fullscreen mode
Exit fullscreen mode
}
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span> <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">List</span><span class="o"><</span><span class="nc">Future</span><span class="o"><</span><span class="nc">UpdateResult</span><span class="o">>></span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span> <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span> <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span> <span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o"><>(</span><span class="n">futures</span><span class="o">));</span> <span class="o">})</span> <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">Promise</span><span class="o"><</span><span class="nc">Void</span><span class="o">></span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span> <span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span> <span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span> <span class="o">})</span> <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span> <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span> <span class="o">})</span> <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span> <span class="o">;</span>
Enter fullscreen mode
Exit fullscreen mode
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
...
}
Bonus feature: achieve ordering
With a little effort it’s possible to achieve ordering:
future composition allows forcing every persist operation to wait the completion of its precedent.
It’s achievable by chaining the async computations one to another, so every one will be executed when the precedent future succeeds.
This is a smart pattern to be used whenserialization
is needed.Enter fullscreen mode Exit fullscreen mode
public class MainVerticle extends AbstractVerticle {
...
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span> <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span> <span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span> <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="nc">Future</span><span class="o">.<</span><span class="nc">UpdateResult</span><span class="o">></span><span class="n">succeededFuture</span><span class="o">(),</span> <span class="o">(</span><span class="n">acc</span><span class="o">,</span> <span class="n">record</span><span class="o">)</span> <span class="o">-></span> <span class="n">acc</span><span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">it</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">)),</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span><span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span> <span class="o">)</span> <span class="o">)</span> <span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">Promise</span><span class="o"><</span><span class="nc">Void</span><span class="o">></span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span> <span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span> <span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span> <span class="o">})</span> <span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-></span> <span class="o">{</span> <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span> <span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span> <span class="o">})</span> <span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">));</span>
Enter fullscreen mode
Exit fullscreen mode
}
...
}
Conclusion
Backpressure is a fundamental topic to cover when working with async programming.
It does not come for free out of the vert.x box, but it can be acheived with some simple tricks.原文链接:Handle backpressure between Kafka and a database with Vert.x
暂无评论内容