Dynamic Task Mapping (Airflow)

1. Contexto

Durante o desenvolvimento de uma DAG no Airflow surgiu uma necessidade de criar múltiplas tasks de forma dinâmica, as tasks deveriam ser criadas em tempo de execução e não se sabe previamente a quantidade de tasks.

Um caso prático seria ter um conjunto com n números e esses números devem passar por um processamento. Supondo que esse conjunto de valores tem 3 elementos, portanto seria a necessária a declaração de 3 tasks. Para valores pré-definidos a declaração dessas tasks são simples.

Abaixo um exemplo de código que processa números multiplicando-os por 2:

# imports
def get_values():
return [10, 20, 30]
def double_value(value):
return int(value) * 2
# DAG code
get_values_task = PythonOperator(
task_id=f"get_values",
python_callable=get_values
)
double_value_1_task = PythonOperator(
task_id=f"double_value_1",
python_callable=double_value,
op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"]
)
double_value_2_task = PythonOperator(
task_id=f"double_value_2",
python_callable=double_value,
op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"]
)
double_value_3_task = PythonOperator(
task_id=f"double_value_3",
python_callable=double_value,
op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"]
)
get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]
# imports

def get_values():
    return [10, 20, 30]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_1_task = PythonOperator(
    task_id=f"double_value_1",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"]
)

double_value_2_task = PythonOperator(
    task_id=f"double_value_2",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"]
)
double_value_3_task = PythonOperator(
    task_id=f"double_value_3",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"]
)

get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]
# imports def get_values(): return [10, 20, 30] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_1_task = PythonOperator( task_id=f"double_value_1", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"] ) double_value_2_task = PythonOperator( task_id=f"double_value_2", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"] ) double_value_3_task = PythonOperator( task_id=f"double_value_3", python_callable=double_value, op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"] ) get_values_task >> [double_value_1_task, double_value_2_task, double_value_3_task]

Enter fullscreen mode Exit fullscreen mode

Entretanto há 2 limitações:

  1. A criação de tasks no Airflow é declarativa. Necessariamente se precisa declarar um Operator e definir a ordem de execução dessa task. Se surgir a necessidade de criar tasks que tem a mesma finalidade com valores dos parâmetros diferentes é necessário declarar um Operator individualmente.
    É possível a criação das tasks com um loop, entretanto é necessário saber previamente quantas tasks é necessário criar.

  2. Resgatar os valores via XCOM não são acessáveis fora de uma Task Instance. Esse fato acontece pois os valores via XCOM são serealizados/deserealizados e renderizados em tempo de execução ao inicializar a execução da task, a renderização faz parte da execução da task.
    Reutilizando o exemplo acima da função double_value, se os valores necessários para esse processamento estiver na XCOM então só seria possível acessá-los durante a execução de outra task. Criar uma task dentro de uma outra task não é recomendável.

Então como posso criar tasks dinamicamente baseado nos valores de tasks anteriores? No Airflow há um recurso para resolver esse problema: Dynamic Task Mapping.

2. O que é Dynamic Task Mapping?

O Dynamic Task Mapping permite criar um conjuntos de tasks em tempo de execução baseado em parâmetros, sem o autor da DAG saber quantas tasks são necessárias previamente. É similar a definir as tasks num loop, entretanto o scheduler do Airflow usa como base o output da task anterior.

Com esse recurso é possível paralelizar a execução da task, onde a task declarada terá uma lista de subtasks, cada substasks será criada dinamicamente baseado no parâmetro passado.

Esse recurso no Airflow tornou-se disponível a partir da versão 2.3.0.

3. Um exemplo prático

A ideia do Dynamic Task Mapping é definir os parâmetros onde um desses parâmetros será utilizado para a criação das tasks dinâmicas. A sintaxe de declaração do Operator é semelhante a um Operator convencional, haverá a adição de 2 métodos: partial e expand.

  • expand

Como o próprio nome diz significa expandir o parâmetro utilizado. Dado a lista do parâmetro definido então sera expandido essa lista. Para cada expansão é criado uma nova task.

É possível definir 1 ou mais parâmetros para expandir. Caso haja mais de 1 parâmetro então será feito o produto cartesiano dos parâmetros.

PythonOperator.partial(...).expand(parameter_1, parameter_2,...)
PythonOperator.partial(...).expand(parameter_1, parameter_2,...)
PythonOperator.partial(...).expand(parameter_1, parameter_2,...)

Enter fullscreen mode Exit fullscreen mode

  • partial

Valores intermediários onde serão fixos para cada expansão dos parâmetros do expand, esses valores não são expandidos.

No partial terá os parâmetros da task como task_id, python_callable, parâmetros definidos e etc.

PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)
PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)
PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)

Enter fullscreen mode Exit fullscreen mode

Abrindo um parênteses, há uma forma de pegar o output de uma task da seguinte maneira:

# Task arbitraty
arbitraty_task = PythonOperator(...)
arbitraty_task.output
# Task arbitraty
arbitraty_task = PythonOperator(...)
arbitraty_task.output
# Task arbitraty arbitraty_task = PythonOperator(...) arbitraty_task.output

Enter fullscreen mode Exit fullscreen mode

Juntando essas informações e definindo a Dynamic Task Mapping para o primeiro exemplo dado de processa os números multiplicando-os por 2:

double_value_task = PythonOperator.partial(
task_id=f"double_value",
python_callable=double_value,
).expand(op_args=get_values_task.output)
double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)
double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output)

Enter fullscreen mode Exit fullscreen mode

Na prática não se torna necessário declarar explicitamente a task para cada valor retornado da task anterior.

Como se interpreta o PythonOperator criado com Dynamic Task Mapping?

Para cada valor retornado da task get_values será invocado a função double_value.

Como o Airflow mostra essa informação na UI?

O grafo mostrado será diferente, onde terá apenas 1 PythonOperator declarado.

Para acessar a informação de cada task criada dinamicamente será necessário clicar na task e acessar o campo Mapped Tasks. O Map Index é criado de 0 até n, onde n é n-ésimo índice que indica a quantidade de elementos retornados da task anterior.

Por fim a DAG descrita no primeiro exemplo, que processa números multiplicando-os por 2, pode ser reduzida para:

# imports
def get_values():
return [[10], [20], [30]]
def double_value(value):
return int(value) * 2
# DAG code
get_values_task = PythonOperator(
task_id=f"get_values",
python_callable=get_values
)
double_value_task = PythonOperator.partial(
task_id=f"double_value",
python_callable=double_value,
).expand(op_args=get_values_task.output)
get_values_task >> double_value_task
# imports

def get_values():
    return [[10], [20], [30]]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)

get_values_task >> double_value_task
# imports def get_values(): return [[10], [20], [30]] def double_value(value): return int(value) * 2 # DAG code get_values_task = PythonOperator( task_id=f"get_values", python_callable=get_values ) double_value_task = PythonOperator.partial( task_id=f"double_value", python_callable=double_value, ).expand(op_args=get_values_task.output) get_values_task >> double_value_task

Enter fullscreen mode Exit fullscreen mode

Em resumo o Dynamic Task Mapping pode ser utilizado para criar tasks de forma dinâmica e paralelizar processamentos. Abaixo é possível acessar a documentação para entender com mais detalhes.

4. Referências

[1] Airflow. Dynamic Task Mapping. Disponível em: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html. Acessado em: 09/12/2024.

原文链接:Dynamic Task Mapping (Airflow)

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
Fight for the things you love no matter what you may face, it will be worth it.
不管你面对的是什么,为你所爱的而奋斗都会是值得的
评论 抢沙发

请登录后发表评论

    暂无评论内容