본문 바로가기

Data Engineering/Airflow

Airflow save(hdfs) create(hive-table) processing(spark)

Download forex rates with Python

import csv
import requests
import json
from airflow.operators.python_operator import PythonOperator

def download_rates():
    BASE_URL = "<https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/>"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
            with_pairs = row['with_pairs'].split(' ')
            indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
            outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
                outdata['rates'][pair] = indata['rates'][pair]
            with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
                json.dump(outdata, outfile)
                outfile.write('\\n')

downloading_rates = PythonOperator(
            task_id="downloading_rates",
            python_callable=download_rates
    )

Save the forex rates in HDFS

from airflow.operators.bash import BashOperator

saving_rates = BashOperator(
        task_id="saving_rates",
        bash_command="""
            hdfs dfs -mkdir -p /forex && \\
            hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
            """
    )

Create to hive table from hdfs

creating_forex_rates_table = HiveOperator(
        task_id="creating_forex_rates_table",
        hive_cli_conn_id="hive_conn",
        hql="""
            CREATE EXTERNAL TABLE IF NOT EXISTS forex_rates(
                base STRING,
                last_update DATE,
                eur DOUBLE,
                usd DOUBLE,
                nzd DOUBLE,
                gbp DOUBLE,
                jpy DOUBLE,
                cad DOUBLE
                )
            ROW FORMAT DELIMITED
            FIELDS TERMINATED BY ','
            STORED AS TEXTFILE
        """
    )

---hive_conn---
conn type : hive server 2 Thrift
host : hive-server
login/password : hive
port : 10000

Processing forex rates with spark

---forex_processing.py---
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json

warehouse_location = abspath('spark-warehouse')

# Initialize Spark Session
spark = SparkSession \\
    .builder \\
    .appName("Forex processing") \\
    .config("spark.sql.warehouse.dir", warehouse_location) \\
    .enableHiveSupport() \\
    .getOrCreate()

# Read the file forex_rates.json from the HDFS
df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')

# Drop the duplicated rows based on the base and last_update columns
forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \\
    .dropDuplicates(['base', 'last_update']) \\
    .fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])

# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")

---task---
forex_processing = SparkSubmitOperator(
        task_id="forex_processing",
        application="/opt/airflow/dags/scripts/forex_processing.py",
        conn_id="spark_conn",
        verbose = False
    )

---add connection---
conn_id = spark_conn
conn_type = spark
host = spark://spark-master
7077

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow Send email & slack notification  (0) 2022.03.07
Airflow Sensor Operator  (0) 2022.03.07
Airflow Docker로 구성해보기  (0) 2022.03.07
Airflow task group, xcom, trigger rule  (0) 2022.03.07
Airflow Scale up  (0) 2022.03.07