En esta serie de post acerca de programación reactiva voy a ir contando los pasos que estoy dando para ir practicando con esta forma de programación y más en concreto su uso en aplicaciones HTTP con Micronaut
En el post anterior nos centramos en la parte servidora y vimos cómo crear un Single
(y devolverlo como retorno de la función) para así delegar en el subsistema la ejecución de nuestro código de forma asíncrona.
En este post nos vamos a centrar en la parte consumidora viendo cómo podemos usar el modelo reactivo en la construcción de un API que consuma endpoints remotos. Para ello vamos a usar los siguientes recursos públicos:
http://api.open-notify.org/astros.json que devuelve un detalle de los astronautas que hay ahora mismo en el espacio
http://api.open-notify.org/iss-now.json que devuelve la posición de la estación ISS en el momento de la petición
Modelos
Vamos a construir el modelo que representa a cada uno de los endpoints
public class Astronaut {
public String name;
public String craft;
@Override
public String toString(){
return name+" "+craft;
}
}
public class Astros {
public String message;
public int number;
public List<Astronaut>people;
}
public class IssPosition {
public double latitude;
public double longitude;
public String toString(){
return "lat:"+latitude+" log:"+longitude;
}
}
public class Iss {
public IssPosition iss_position;
public String toString(){
return "current position "+iss_position;
}
}
Enter fullscreen mode Exit fullscreen mode
Así como el modelo que va a devolver nuestra API, que no es más que la unión de ambos
public class Nasa {
public Astros astros;
public Iss iss;
public Nasa astros(Astros astros){
this.astros=astros;
return this;
}
public Nasa iss(Iss iss){
this.iss=iss;
return this;
}
}
Enter fullscreen mode Exit fullscreen mode
ClientS
Por otra parte vamos a definir nuestros interfaces Client Micronaut que nos permiten recuperar dichos modelos
@Client("http://api.open-notify.org/")
public interface AstrosClient {
@Get("/astros.json")
Single<Astros>getAstros();
}
@Client("http://api.open-notify.org/")
public interface IssClient {
@Get("iss-now.json")
Single<Iss> getIss();
}
Enter fullscreen mode Exit fullscreen mode
Service
Para recuperar ambos modelos (ISS y Astros) y devolverlos como uno sólo (Nasa) vamos a crear un servicio NasaService
el cual va a ofrecer 3 formas diferentes de hacerlo:
block
, primero va a llamar a uno de los endpoints de forma bloqueante y después al otro de la misma forma
chain
, primero va a llamar a uno de lso endpoints de forma reactiva y cuando se complete la llamada se llamará al segundo endpoint de la misma forma
zip
, vamos a llamar a ambos endpoints a la vez de forma reactiva y cuando se completen ambos devolveremos el resultado
@Singleton
public class NasaService {
@Inject
IssClient issClient;
@Inject
AstrosClient astrosClient;
public Single<Nasa> blockingGet() {
...
}
public Single<Nasa>chain() {
...
}
public Single<Nasa>zip(){
...
}
}
Enter fullscreen mode Exit fullscreen mode
Blocking
public Single<Nasa> blockingGet() {
return Single.create(emitter -> {
Nasa nasa = new Nasa();
nasa.astros = astrosClient.getAstros().blockingGet();
nasa.iss = issClient.getIss().blockingGet();
emitter.onSuccess(nasa);
});
}
Enter fullscreen mode Exit fullscreen mode
En este modo, el servicio construye un objeto Nasa de respuesta y lo va completando según se van obteniendo las respuestas de los endpoints remotos. Una vez terminados se emite el objeto resultante
Como podemos intuir, este método será el más lento y propenso a errores pues en el mejor de los casos el total del tiempo necesario será la suma de ambas llamadas. Además, al esperar la respuesta de ambas llamadas estamos bloqueando el hilo por lo que otras solicitudes se quedarán encoladas.
Chain
BiFunction<Nasa, SingleEmitter<Nasa>, Disposable> issFunc = (nasa, emitter)->
issClient.getIss().subscribe(
iss ->
emitter.onSuccess(nasa.iss(iss))
,
err->
emitter.onError(err)
);
BiFunction<Nasa, SingleEmitter<Nasa>, Disposable> astrosFunc = (nasa, emitter)->
astrosClient.getAstros().subscribe(
astros ->
issFunc.apply(nasa.astros(astros), emitter)
,
err->
emitter.onError(err)
);
public Single<Nasa>chain() {
return Single.create(
emitter -> astrosFunc.apply(new Nasa(), emitter)
);
}
Enter fullscreen mode Exit fullscreen mode
- NOTE
-
Para ayudar en la legibilidad de este método vamos a usar
BiFunction
aunque podríamos seguir usando clases anónimas lambdas
Básicamente definimos un método chain
que crea un Single
(como en caso block) y lo que hace es aplicar una funciónastrosFunc
. Esta función simplemente se subscribe a la llamada asíncrona que recupera los astronautas del espacio y cuando se complete encadena a su vez otra subscripción al endpoint de la ISS. Cuando este segundo endpoint se completa se devuelve el resultado al emitter
“original”
La idea es muy parecida al método anterior (aunque el código parece un poco más complicado) en el sentido de que vamos a llamar a dos funciones de forma seguida, por lo que el tiempo total será aproximadamente igual. Sin embargo, al no bloquear el hilo podremos aceptar más solicitudes.
Zip
public Single<Nasa>zip(){
return Single.zip(astrosClient.getAstros(), issClient.getIss(),
(astros, iss) ->
new Nasa().astros(astros).iss(iss));
}
Enter fullscreen mode Exit fullscreen mode
Mediante zip
un Single
es capaz de poder llamar de forma simultánea a un número de observables y ejecutar una función con el resultado de todos ellos.
- INFO
-
La lista de observables a ejecutar puede ser “fija”, como en este caso, o una lista de ellos. En caso de fija podemos especificar hasta 10 (creo) observables y en la función a ejecutar recibiremos los resultados de forma explícita. En el caso de una lista, lo que obtendremos en la función es un array de Object y nos toca a nosotros interpretarlos
Controller
Básicamente el controller es un simple punto de entrada para cada tipo que llama directamente al servicio
@Controller("/nasa")
public class NasaController {
@Inject
NasaService service;
@Get("/blocking")
Single<Nasa>blockingGet() {
return service.blockingGet();
}
@Get("/chain")
Single<Nasa>chain() {
return service.chain();
}
@Get("/zip")
Single<Nasa>zip(){
return service.zip();
}
}
Enter fullscreen mode Exit fullscreen mode
JMeter
Con estos 3 endpoints he preparado una prueba de carga usando JMeter y he ejecutado 100 peticiones simultáneas contra cada uno.
Blocking
Obtenemos una tasa de error muy elevada
Chain
Conseguimos no bloquear el hilo de llamada reduciendo los errores a cero
Zip
No sólo conseguimos reducir los errores a cero sino que la diferencia de tiempo respecto de chain es significativa (casi la mitad)
Resumen
Por el lado del consumidor vemos que utilizar programación reactiva nos ayuda a optimizar los recursos y mejorar los tiempos de respuesta
暂无评论内容