본문 바로가기

Data Engineering

(16)
Airflow Send email & slack notification email ---**airflow.cfg---** [smtp] # If you want airflow to send emails on retries, failure, and you want to use # the airflow.utils.email.send_email_smtp function, you have to configure an # smtp server here smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False # Example: smtp_user = airflow smtp_user = # Example: smtp_password = airflow smtp_password = smtp_port = 587 smtp_mail_from..
Airflow save(hdfs) create(hive-table) processing(spark) Download forex rates with Python import csv import requests import json from airflow.operators.python_operator import PythonOperator def download_rates(): BASE_URL = "" ENDPOINTS = { 'USD': 'api_forex_exchange_usd.json', 'EUR': 'api_forex_exchange_eur.json' } with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies: reader = csv.DictReader(forex_currencies, delimiter=';') fo..
Airflow Sensor Operator Dag 시작하기 from airflow import DAG from datetime import datetime, timedelta default_args = { "owner": "airflow", "email_on_failure": False, "email_on_retry": False, "email": "admin@localhost.com", # if failure or retry, notify to email "retries": 1, # limits retry "retry_delay": timedelta(minutes=5) } # catch up 옵션이 true일 경우, start_date ~ 현재까지 실행 안됬던 모든 대그가 실행된다. with DAG("forex_data_pipeline", st..
Airflow Docker로 구성해보기 Docker 작동방식 docker file → docker image → docker run → docker container(application) Docker image는 Docker hub에서 가져온다. Docker run을 통해 격리된 환경에서 실행한다. 격리된 환경은 같은 운영체제를 사용하기에 가상머신에 비해 컴팩트하다. airflow와 Docker airflow의 컴포넌트의 분리 웹서버 스케줄러 데이터베이스 각 컴포넌트를 분리하여 사용하면 하나의 구성요소가 에러나도 다른 컴포넌트에 영향을 안주기에 효율적이다. Docker compose 각 컴포넌트를 실행시키지 않고, compose 명령어를 통해 한번에 할수있다. 이 때 각 컨테이터는 같은 네트워크를 가지고 서로 통신할 수 있다. local 환..
Airflow task group, xcom, trigger rule TaskGroup으로 subdags 만들기 → parallel_dag.py from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag import SubDagOperator from subdag.subdag_parallel_dag import subdag_parallel_dag from airflow.utils.task_group import TaskGroup from datetime import datetime default_args = { 'start_date': datetime(2020, 1, 1), } with DAG(dag_id='parallel_dag', ..
Airflow Scale up 스케일업과 페러렐 동작 scale up & parallel 를 위한 기본 구성 방법 parallel 조건 다중 접속이 가능한 타 DB를 이용한다. sqlite는 동시에 여러 권한을 허용하지 않는다. 그렇기에 하나의 작업이 끝나야만 해당 DB를 사용할 수 있다. SequentialExecutor가 아닌 LocalExcutor를 사용한다. t1 >> [t2, 53] postgresql setting sudo apt install postgresql sudo -u postgres psql pip install 'apache-airflow[postgres]' # airflow.cfg 파일 수정해서 metasotre을 sqlite에서 postgresql로 변경 sql_alchemy_conn = postgresql..
Airflow Pipeline Build 해보기(feat Operator & Scheduling) operator란? 파이프 라인 안에서 단일의 Task를 정의해야한다. Operator 종류 Action Operator Transfer Operator Sensors 파이프라인 순서 테이블 생성 → sqlite http sensor 통해 API가 사용가능한지 확인 http operator를 통해 데이터를 추출 python operator를 통해 processing 가공된 정보를 테이블에 저장 Start guide home/airflow/airflow # 접속 mkdir dags # 폴더 생성 ./dags/user_processing.py # py파일생성 SQLite Connection 세팅 source sandbox/bin/activate pip install apache-airflow-provider..
Airflow 설치와 구성요소 가상환경세팅 python3 -m venv sandbox source sandbox/bin/activate # 가상환경 진입 pip install wheel # airflow 종속성 모듈 pip3 install apache-airflow==2.1.0 --constraint airflow db init # 초기 db 설정 airflow webserver, scheduler # 포트에 서비스 실행 # 기타 명령어는 -h 옵션으로 보기 Airflow란? Orchestrator Task : 각 태스크들을 적절한 타이밍에 실행 시킬수 있다. 확장성이 좋다. -> 다양한 라이브러리 지원 필요한 만큼 인스턴스를 늘릴 수 있다. UI가 좋다 구성요소 Web Server Flask와 Gunicon으로 사용자와 상호작용 할..