11/24/2023 0 Comments Example airflow dagValidate_income = DummyOperator(task_id='validate_income', Income_bookkeep = DummyOperator(task_id='income_bookkeep', Here, we can observe that the Operators in charge of launching an external DAG are shown in pink, and the external task sensor Operators in dark blue. Task_id='wait_finances_a_outcome_bookeep',Ĭalculate_revenue > trigger_finances_a > calculate_expensesĬalculate_expenses > wait_finances_a_expenses_bookkept > operations_a_report Wait_finances_a_expenses_bookkept = ExternalTaskSensor( Trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',Ĭalculate_expenses = DummyOperator(task_id='calculate_expenses', Operations Process # operations_a.pyįrom _dagrun import TriggerDagRunOperatorįrom _task import ExternalTaskSensorĬalculate_revenue = DummyOperator(task_id='calculate_revenue', To develop the solution, we are going to make use of 2 Airflow Operators, TriggerDagRunOperator, which is used to launch the execution of an external DAG, and ExternalTaskSensor, which is used to wait for a Task of an external DAG. Basically because the finance DAG depends first on the operational tasks. To do this, we will have to follow a specific strategy, in this case, we have selected the operating DAG as the main one, and the financial one as the secondary. The objective of this exercise is to divide this DAG in 2, but we want to maintain the dependencies. Here we can see how we have, in fact, 2 processes with dependencies, in the same DAG Two departments, two processes Operations_a_report = DummyOperator(task_id='operations_a_report',įinance_a_report = DummyOperator(task_id='finance_a_report',Ĭalculate_revenue > income_bookkeep > validate_income Outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep', Validate_income = DummyOperator(task_id='finances_validate_income',Ĭalculate_expenses = DummyOperator(task_id='operations_calculate_expenses', Income_bookkeep = DummyOperator(task_id='finances_income_bookkeep', This would be the DAG code and its representation in the Airflow UI: from datetime import timedeltaįrom import DummyOperatorĬalculate_revenue = DummyOperator(task_id='operations_calculate_revenue',
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |