Dag 시작하기
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "admin@localhost.com", # if failure or retry, notify to email
"retries": 1, # limits retry
"retry_delay": timedelta(minutes=5)
}
# catch up 옵션이 true일 경우, start_date ~ 현재까지 실행 안됬던 모든 대그가 실행된다.
with DAG("forex_data_pipeline", start_date=datetime(2021, 1 ,1),
schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
None
Operator
Operator는 task이다.
Operator의 3가지 유형
Action
- 로직을 가지고 실행된다.
Transfer
- 단순하게 데이터의 이동 등 흐름을 만든다.
Sensor
- 신뢰성있는 작업을 위해서, 다음 작업을 하기 위해 대기하는 동작을 한다.
provider 모듈
airflow.providers.{user_setting}
https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html
데이터파이프라인예시
- using http sensor → check availability api
# forex api를 이용할 것이기에, http센서(~.http.sensors.http)를 사용
# → 60초마다 확인이 가능하다.
is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api", # create custom api
endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
poke_interval=5, # 5초마다 api가 이용가능한지 체크한다.
timeout=20 # 응답이 없으면 20초 후에 자동으로 타임아웃한다.
)
# http_conn_id="forex_api"
# 이 부분은 따로 connection을 생성해준다.
- using file sensor → check availability file having
# 미리 웹서버에서 forex_path라는 conn_id를 생성해둔다.
# extras : {"path":"/opt/airflow/dags/files"}
is_forex_currencies_file_available = FileSensor(
task_id="is_forex_currencies_file_available",
fs_conn_id="forex_path", # 미리 지정해둔 곳
filepath="forex_currencies.csv", # 파일 경로
poke_interval=5,
timeout=20
)
Dag 시작하기
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "admin@localhost.com", # if failure or retry, notify to email
"retries": 1, # limits retry
"retry_delay": timedelta(minutes=5)
}
# catch up 옵션이 true일 경우, start_date ~ 현재까지 실행 안됬던 모든 대그가 실행된다.
with DAG("forex_data_pipeline", start_date=datetime(2021, 1 ,1),
schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
None
Operator
Operator는 task이다.
Operator의 3가지 유형
Action
- 로직을 가지고 실행된다.
Transfer
- 단순하게 데이터의 이동 등 흐름을 만든다.
Sensor
- 신뢰성있는 작업을 위해서, 다음 작업을 하기 위해 대기하는 동작을 한다.
provider 모듈
airflow.providers.{user_setting}
https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html
데이터파이프라인예시
- using http sensor → check availability api
# forex api를 이용할 것이기에, http센서(~.http.sensors.http)를 사용
# → 60초마다 확인이 가능하다.
is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api", # create custom api
endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
poke_interval=5, # 5초마다 api가 이용가능한지 체크한다.
timeout=20 # 응답이 없으면 20초 후에 자동으로 타임아웃한다.
)
# http_conn_id="forex_api"
# 이 부분은 따로 connection을 생성해준다.
- using file sensor → check availability file having
# 미리 웹서버에서 forex_path라는 conn_id를 생성해둔다.
# extras : {"path":"/opt/airflow/dags/files"}
is_forex_currencies_file_available = FileSensor(
task_id="is_forex_currencies_file_available",
fs_conn_id="forex_path", # 미리 지정해둔 곳
filepath="forex_currencies.csv", # 파일 경로
poke_interval=5,
timeout=20
)
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow Send email & slack notification (0) | 2022.03.07 |
---|---|
Airflow save(hdfs) create(hive-table) processing(spark) (0) | 2022.03.07 |
Airflow Docker로 구성해보기 (0) | 2022.03.07 |
Airflow task group, xcom, trigger rule (0) | 2022.03.07 |
Airflow Scale up (0) | 2022.03.07 |