Cada día la programación funcional está más en boga y hay que ponerse las pilas, así que me he inventado este caso de uso para jugar un poco con ella y sobre todo por practicar con Mono
y Flux
de Reactor (esta vez en Java)
- WARNING
-
As usual, este post se basa en lo que yo he buscado y probado, pudiendo estar mal e incluso rematadamente mal, pero … en mi local funciona.
Caso de Uso
Supongamos que tenemos dos sistemas independientes (dos microservicios en tu arquitectura o dos servicios de dos proveedores diferentes, o uno tuyo y otro externo, por ejemplo):
-
Stock Service. Gestiona la cantidad de artículos que tenemos en el almacén
-
Ecommerce Service. Gestiona algunos de artículos junto con su cantidad disponible más el precio de venta (por ejemplo querríamos tener un catálogo de artículos y para cada uno querer tener en almacén 100 unidades pero en eCommerce solamente 10)
Ambos sistemas son independientes, con sus bases de datos correspondientes, etc y son accesibles a través de unos endpoints diferentes.
La idea es ejecutar un proceso que busque el stock actual de todos los artículos, llamando a Stock Service, aplicar una regla de negocio para cada uno, e invocar al endpoint de eCommerce para actualizar la cantidad de cada uno.
Además, como eCommerce dispone de un subconjunto de artículos, deberemos consultar en eCommerce la lista de artículos para no recorrer todo el catálogo que tenemos en Stock
No parece muy difícil …
Stock Service
public record Stock(String sku, int cantidad) {
}
public record StockPage(int total, Collection<Stock> stocks) {
}
@Get("/")
Mono<StockPage> get(@QueryValue int page)
@Post("/update")
void update(@Body List<Stock> stocks)
Enter fullscreen mode Exit fullscreen mode
Simplificando mucho vamos a trabajar con un Stock
identificado por su sku y del que mantenemos la cantidad que tenemos en stock
StockPage nos sirve para hacer una paginación básica devolviendo el total de artículos en la base de datos, y unos cuantos stocks. El servicio devolverá páginas según se le pida por parámetro.
También añadimos un endpoint para poder actualizar el stock (que nos servirá para jugar a cambiar cantidades del stock y volver a ejecutar nuestro proceso)
- WARNING
-
obviamente es un ejemplo muy simplificado
eCommerce Service
public record Articulo(String sku, int cantidad, double precio) {
}
public record ArticuloPage(int total, Collection<Articulo> articulos) {
}
@Get("/articulos")
public Mono<ArticuloPage> get(@QueryValue int page)
@Post("/update")
public Mono<List<UpdateStock>> update(@Body List<UpdateStock> update)
Enter fullscreen mode Exit fullscreen mode
Parecido al Stock Service, el eCommerce service nos permite paginar los artículos y (lo más importante en este caso) actualizar la cantidad de stock de articulos en ecommmerce
Aproximación tradicional
Siguiendo el diagrama anterior una posible implementación sería:
int offset = 0;
while( offset < maxArticles ){
// obtener n articulos en ecomerce
var articuloPage = ecommerceClient.get(offset).block();
// extraer los ids
var articulos = new ArrayList<String>();
for(var articulo : articuloPage.articulos()){
articulos.add(articulo.sku());
}
// ver el stock de estos articulos
StockPage stockPage = stockClient.get( String.join(",",articulos) ).block();
// logica de negocio para estos articulos
List<UpdateStock> updateStocks = new ArrayList<>();
for (Stock s : stockPage.stocks()) {
updateStocks.add(new UpdateStock(s.sku(), s.cantidad()));
}
// actualizar ecommerce
ecommerceClient.update(updateStocks).block();
offset+=batchSize;
}
Enter fullscreen mode Exit fullscreen mode
Bueno, un poco engorroso pero (semi)fácil de seguir. Pedidos unos pocos articulos a ecommerce, pedimos la cantidad de estos al servicio de stock, y actualizamos ecommerce con la cantidad que tenemos en stock
El “detalle” de esta implementación es que estamos haciendo llamadas a servicios externos y estamos esperando a que se completen (por eso el uso de block()
al final de las llamadas a endpoints)
Si este bucle se está ejecutando en el hilo de un Controller, por ejemplo, este hilo se queda bloqueado mientras se resuelven las llamadas pudiendo quedarse congelado si no disponemos de los hilos necesarios.
Implementación Reactiva (no bloqueante)
La implementación reactiva (y funcional) va a hacer uso de los objetos Mono
y Flux
(si usas reactor) y otros, los cuales en realidad NO ejecutan en ese momento la lógica que le indicamos, sino que las van “encadenando” y ejecutando en hilos diferentes coordinados por la implementación reactiva que usemos
Mono<ArrayList<UpdateStock>> reactive() {
return Flux
.generate(() -> 0, (offset, emitter) -> {
if (offset < maxArticles) {
emitter.next(offset);
} else {
emitter.complete();
}
return offset + batchSize;
})
.concatMap(page -> ecommerceClient.get((Integer) page))
.map(ArticuloPage::articulos)
.filter(items -> !items.isEmpty())
.flatMap(items -> stockClient.get(items.stream().map(Articulo::sku).collect(Collectors.joining()))
.map(stocks -> stocks.stocks()
.stream()
.map(s -> new UpdateStock(s.sku(), s.cantidad()))
.toList()))
.concatMap(ecommerceClient::update)
.reduce(new ArrayList<>(), (prev, items) -> {
prev.addAll(items);
return prev;
});
}
Enter fullscreen mode Exit fullscreen mode
- INFO
-
No he conseguido resolver de forma satisfactoria el hacer una primera llamada que me de el numero de páginas que hay en eCommerce por lo que voy a hacer un número de llamadas “fijas” y luego filtrar las respuestas que no contengan datos.
Esta implementación:
-
genera un Flux de números entre 0 y N
-
cada
page
es mapeado a una llamada asíncrona al eCommerce -
De lo que devuelva (en un futuro) el eCommerce, nos quedamos con la lista de artículos
-
filtramos aquellas respuestas que vienen vacía
-
Cada lista de artículos en eCommerce es convertida a una llamada al servicio de stock
-
Cada lista de stocks que nos devolverá (en un futuro) el stock la usaremos para actualizar eCommerce
-
Por último, juntaremos todas las respuestas (futuras) de eCommerce en un List para devolver la lista de articulos actualizados
Repositorio
He creado un repositorio con una aplicación Micronaut por si quieres jugar con estos servicios y las implementaciones.
https://github.com/jagedn/reactor-demo
Para simplificar la implementación los dos servicios (stock y ecommerce) se encuentran en la misma aplicación, pero de forma independiente. Cada uno se accede por unos endpoints diferentes y mantienen una “base de datos” independiente (la base de datos es un simple List en memoria)
Una vez clonado el repositorio podremos ejecutar la aplicacion:
./gradlew run
y acceder a los servicios
http localhost:8080/ecommerce/articulos
, para ver la situacion actual de ecommerce
http localhost:8080/stock/
, para ver la situacion actual del stock
http -v localhost:8080/stock/update [0][sku]=1 [0][cantidad]=1
para actualizar la cantidad del artículo “1” en la base de datos del stock
En este punto, eCommerce tiene una cantidad diferente que Stock así que ejecutaremos la lógica de negocio que las “sincroniza”
http localhost:8080/ejemplo/block
o
http localhost:8080/ejemplo/reactive
Si todo ha ido bien, la cantidad disponible en stock habrá sido actualizada en eCommerce
暂无评论内容