1. Contexto
Durante o desenvolvimento de uma DAG no Airflow surgiu uma necessidade de criar múltiplas tasks de forma dinâmica, as tasks deveriam ser criadas em tempo de execução e não se sabe previamente a quantidade de tasks.
Um caso prático seria ter um conjunto com n números e esses números devem passar por um processamento. Supondo que esse conjunto de valores tem 3 elementos, portanto seria a necessária a declaração de 3 tasks. Para valores pré-definidos a declaração dessas tasks são simples.
Abaixo um exemplo de código que processa números multiplicando-os por 2:
# importsdef get_values():return [10, 20, 30]def double_value(value):return int(value) * 2# DAG codeget_values_task = PythonOperator(task_id=f"get_values",python_callable=get_values)double_value_1_task = PythonOperator(task_id=f"double_value_1",python_callable=double_value,op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"])double_value_2_task = PythonOperator(task_id=f"double_value_2",python_callable=double_value,op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"])double_value_3_task = PythonOperator(task_id=f"double_value_3",python_callable=double_value,op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"])get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]# imports def get_values(): return [10, 20, 30] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_1_task = PythonOperator( task_id=f"double_value_1", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"] ) double_value_2_task = PythonOperator( task_id=f"double_value_2", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"] ) double_value_3_task = PythonOperator( task_id=f"double_value_3", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"] ) get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]# imports def get_values(): return [10, 20, 30] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_1_task = PythonOperator( task_id=f"double_value_1", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"] ) double_value_2_task = PythonOperator( task_id=f"double_value_2", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"] ) double_value_3_task = PythonOperator( task_id=f"double_value_3", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"] ) get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]
Enter fullscreen mode Exit fullscreen mode
Entretanto há 2 limitações:
-
A criação de tasks no Airflow é declarativa. Necessariamente se precisa declarar um Operator e definir a ordem de execução dessa task. Se surgir a necessidade de criar tasks que tem a mesma finalidade com valores dos parâmetros diferentes é necessário declarar um Operator individualmente.
É possível a criação das tasks com um loop, entretanto é necessário saber previamente quantas tasks é necessário criar. -
Resgatar os valores via XCOM não são acessáveis fora de uma Task Instance. Esse fato acontece pois os valores via XCOM são serealizados/deserealizados e renderizados em tempo de execução ao inicializar a execução da task, a renderização faz parte da execução da task.
Reutilizando o exemplo acima da função double_value, se os valores necessários para esse processamento estiver na XCOM então só seria possível acessá-los durante a execução de outra task. Criar uma task dentro de uma outra task não é recomendável.
Então como posso criar tasks dinamicamente baseado nos valores de tasks anteriores? No Airflow há um recurso para resolver esse problema: Dynamic Task Mapping.
2. O que é Dynamic Task Mapping?
O Dynamic Task Mapping permite criar um conjuntos de tasks em tempo de execução baseado em parâmetros, sem o autor da DAG saber quantas tasks são necessárias previamente. É similar a definir as tasks num loop, entretanto o scheduler do Airflow usa como base o output da task anterior.
Com esse recurso é possível paralelizar a execução da task, onde a task declarada terá uma lista de subtasks, cada substasks será criada dinamicamente baseado no parâmetro passado.
Esse recurso no Airflow tornou-se disponível a partir da versão 2.3.0.
3. Um exemplo prático
A ideia do Dynamic Task Mapping é definir os parâmetros onde um desses parâmetros será utilizado para a criação das tasks dinâmicas. A sintaxe de declaração do Operator é semelhante a um Operator convencional, haverá a adição de 2 métodos: partial e expand.
- expand
Como o próprio nome diz significa expandir o parâmetro utilizado. Dado a lista do parâmetro definido então sera expandido essa lista. Para cada expansão é criado uma nova task.
É possível definir 1 ou mais parâmetros para expandir. Caso haja mais de 1 parâmetro então será feito o produto cartesiano dos parâmetros.
PythonOperator.partial(...).expand(parameter_1, parameter_2,...)PythonOperator.partial(...).expand(parameter_1, parameter_2,...)PythonOperator.partial(...).expand(parameter_1, parameter_2,...)
Enter fullscreen mode Exit fullscreen mode
- partial
Valores intermediários onde serão fixos para cada expansão dos parâmetros do expand, esses valores não são expandidos.
No partial terá os parâmetros da task como task_id, python_callable, parâmetros definidos e etc.
PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)
Enter fullscreen mode Exit fullscreen mode
Abrindo um parênteses, há uma forma de pegar o output de uma task da seguinte maneira:
# Task arbitratyarbitraty_task = PythonOperator(...)arbitraty_task.output# Task arbitraty arbitraty_task = PythonOperator(...) arbitraty_task.output# Task arbitraty arbitraty_task = PythonOperator(...) arbitraty_task.output
Enter fullscreen mode Exit fullscreen mode
Juntando essas informações e definindo a Dynamic Task Mapping para o primeiro exemplo dado de processa os números multiplicando-os por 2:
double_value_task = PythonOperator.partial(task_id=f"double_value",python_callable=double_value,).expand(op_args=get_values_task.output)double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output)double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output)
Enter fullscreen mode Exit fullscreen mode
Na prática não se torna necessário declarar explicitamente a task para cada valor retornado da task anterior.
Como se interpreta o PythonOperator criado com Dynamic Task Mapping?
Para cada valor retornado da task get_values será invocado a função double_value.
Como o Airflow mostra essa informação na UI?
O grafo mostrado será diferente, onde terá apenas 1 PythonOperator declarado.
Para acessar a informação de cada task criada dinamicamente será necessário clicar na task e acessar o campo Mapped Tasks. O Map Index é criado de 0 até n, onde n é n-ésimo índice que indica a quantidade de elementos retornados da task anterior.
Por fim a DAG descrita no primeiro exemplo, que processa números multiplicando-os por 2, pode ser reduzida para:
# importsdef get_values():return [[10], [20], [30]]def double_value(value):return int(value) * 2# DAG codeget_values_task = PythonOperator(task_id=f"get_values",python_callable=get_values)double_value_task = PythonOperator.partial(task_id=f"double_value",python_callable=double_value,).expand(op_args=get_values_task.output)get_values_task >> double_value_task# imports def get_values(): return [[10], [20], [30]] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output) get_values_task >> double_value_task# imports def get_values(): return [[10], [20], [30]] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output) get_values_task >> double_value_task
Enter fullscreen mode Exit fullscreen mode
Em resumo o Dynamic Task Mapping pode ser utilizado para criar tasks de forma dinâmica e paralelizar processamentos. Abaixo é possível acessar a documentação para entender com mais detalhes.
4. Referências
[1] Airflow. Dynamic Task Mapping. Disponível em: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html. Acessado em: 09/12/2024.
暂无评论内容