본문 바로가기

Data Engineering/Airflow

Airflow task group, xcom, trigger rule

TaskGroup으로 subdags 만들기parallel_dag.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag import SubDagOperator

from subdag.subdag_parallel_dag import subdag_parallel_dag
from airflow.utils.task_group import TaskGroup

from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1),
}

with DAG(dag_id='parallel_dag', schedule_interval='@daily', 
    default_args=default_args, catchup = False) as dag:
    
    # Tasks dynamically generated 
    task_1 = BashOperator(
        task_id='task_1', 
        bash_command='sleep 3'
    )
    
    with TaskGroup('processing_tasks') as processing_tasks: 
        task_2 = BashOperator(
            task_id='task_2', 
            bash_command='sleep 3'
        )
				# spark_tasks.task_3를 내부적으로 가지므로 중복가능하다.
        with TaskGroup('spark_tasks') as spark_tasks:
            task_3 = BashOperator(
                task_id='task_3', 
                bash_command='sleep 3'
            )
        
        with TaskGroup('flink_tasks') as flink_tasks:
            task_3 = BashOperator(
                task_id='task_3', 
                bash_command='sleep 3'
            ) 
            
    task_4 = BashOperator(
        task_id='task_4', 
        bash_command='sleep 3'
    )

    task_1 >> processing_tasks >> task_4

task 간에 data 공유 방법

  1. 외부 툴에서 push pull
  2. airflow 자체 db를 이용하는 xcom

xcom을 사용하여 push 및 pull → xcomdag.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup

from random import uniform
from datetime import datetime

default_args = {
    'start_date': datetime(2020, 1, 1)
}

def _training_model(ti):
    accuracy = uniform(0.1, 10.0)
    print(f'model\\'s accuracy: {accuracy}')
    ti.xcom_push(key='model_accuracy', value=accuracy)

def _choose_best_model(ti):
    print('choose best model')
    accuracies = ti.xcom_pull(key='model_accuracy', task_ids = [
        'processing_tasks.training_model_a',
        'processing_tasks.training_model_b',
        'processing_tasks.training_model_c'
    ])

with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:

    downloading_data = BashOperator(
        task_id='downloading_data',
        bash_command='sleep 3'
			#	do_xcom_push = False
    )

 
  • 기본적으로 몇 Operator는 자동적으로 xcom이 생성되고 옵션으로 끌 수 있다.
    • do_xcom_push = False

BranchPythonOperator & DummyOperator를 이용한 조건 Dag → xcomdag.py

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator

def _choose_best_model(ti):
    print('choose best model')
    accuracies = ti.xcom_pull(key='model_accuracy', task_ids = [
        'processing_tasks.training_model_a',
        'processing_tasks.training_model_b',
        'processing_tasks.training_model_c'
    ])

    for accuracy in accuracies:
        if accuracy > 5:
            return 'accurate' 
    return 'inaccurate'  # ['accurate', 'inaccurate'] 로 multi tasks 가능

# accurate inaccurate는 조건에 의해 스킵되거나 실행된다.
downloading_data >> processing_tasks >> choose_model
    choose_model >>  [accurate, inaccurate]

trigger rule을 이용한 storing → xcomdag.py

storing = DummyOperator(
        task_id='storing'
				trigger_rule='none_failed_or_skipped'
    )

    downloading_data >> processing_tasks >> choose_model
    choose_model >>  [accurate, inaccurate] >> storing

# storing에 트리거룰이 없다면, 
# [accurate, inaccurate] 이 중 하나는 스킵되면서 storing도 스킵이된다.

트리거룰

  • 설명은 [a, b] >> c
  • 코드는 [1,2] >>3, 전부 3번이 성공하도록 짜여있다.
  • exit 0 → success , exit 1 → failed
  • all_success
    • a, b 성공 c 진행
    • a, b 중 하나라도 실패하면 c는 upstream_failed status
  • with DAG(dag_id='trigger_rule', schedule_interval='@daily', default_args=default_args, catchup = False) as dag: task_1 = BashOperator( task_id='task_1' bash_command='exit 0' do_xcom_push=False ) task_2 = BashOperator( task_id='task_2' bash_command='exit 0' do_xcom_push=False ) task_3 = BashOperator( task_id='task_3' bash_command='exit 0' do_xcom_push=False )
  • all_failed
    • a,b 실패 c 진행
    • a,b 둘 중 하나라도 성공하면 c는 skip
  • task_1 = BashOperator( task_id='task_1' bash_command='exit 1' ) task_2 = BashOperator( task_id='task_2' bash_command='exit 1' ) task_3 = BashOperator( task_id='task_3' bash_command='exit 0' trigger_rule='all_failed' )
  • all_done
    • a,b 상태와 상관없이 trigger되면 c는 실행된다.
  • task_1 = BashOperator( task_id='task_1' bash_command='exit 1' ) task_2 = BashOperator( task_id='task_2' bash_command='exit 0' ) task_3 = BashOperator( task_id='task_3' bash_command='exit 0' trigger_rule='all_done' )
  • one_success
    • a, b 둘 중 하나라도 성공하면 c는 실행
  • one_failed
    • a, b 둘 중 하나라도 실패하면 c는 실행
  • task_1 = BashOperator( task_id='task_1' bash_command='exit 1' ) task_2 = BashOperator( task_id='task_2' bash_command='sleep30' ) task_3 = BashOperator( task_id='task_3' bash_command='exit 0' trigger_rule='one_failed' ) # 2가 동작 중이더라도 1이 실패했기에 3번은 동작한다.
  • none_failed
    • a,b 둘 중 failed 상태만 아니라면 성공하거나 스킵하면 c 트리거
  • none_failed or skipped
    • none_failed는 a,b 상태가 통일되야되지만 해당 status는 상관없다. 하지만 하나라도 성공해야한다. ex) a성공 b스킵 되었을 때 filed가 없고 하나 이상이 통과되었기에 c는 트리거될 조건 만족한다.

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow Sensor Operator  (0) 2022.03.07
Airflow Docker로 구성해보기  (0) 2022.03.07
Airflow Scale up  (0) 2022.03.07
Airflow Pipeline Build 해보기(feat Operator & Scheduling)  (0) 2022.03.07
Airflow 설치와 구성요소  (0) 2022.03.07