Saya ingin membuat sebuah bersyarat tugas di aliran Udara seperti dijelaskan dalam skema di bawah ini. Skenario yang diharapkan adalah sebagai berikut:
Semua tugas di atas adalah SSHExecuteOperator. I'm menebak saya harus menggunakan ShortCircuitOperator dan / atau XCom untuk mengelola kondisi ini, tapi saya tidak jelas tentang bagaimana untuk melaksanakan itu. Bisa anda jelaskan solusinya?
Aliran udara yang memiliki BranchPythonOperator yang dapat digunakan untuk menyatakan percabangan ketergantungan yang lebih langsung.
The docs menjelaskan penggunaannya:
BranchPythonOperator jauh seperti PythonOperator kecuali bahwa ia mengharapkan python_callable yang kembali task_id. Yang task_id kembali diikuti, dan semua jalan-jalan lain yang dilewati. Yang task_id yang dikembalikan oleh fungsi Python telah menjadi referensi tugas ini diatas hilir dari BranchPythonOperator tugas.
...
Jika anda ingin melewati beberapa tugas, perlu diingat bahwa anda tidak dapat memiliki kosong di jalan, jika demikian membuat dummy tugas.
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
Jika anda're memasang aliran Udara versi >=1.10.3, anda juga dapat kembali daftar tugas id, yang memungkinkan anda untuk melewatkan beberapa hilir jalan di satu Operator dan don't menggunakan dummy tugas sebelum bergabung.
Anda harus menggunakan aliran udara yang memicu aturan
Semua operator memiliki trigger_rule argumen yang mendefinisikan aturan yang dihasilkan tugas terpicu.
Pemicu aturan kemungkinan:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
Berikut ini adalah ide untuk memecahkan masalah anda:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)