Data Engineering/Airflow
Airflow save(hdfs) create(hive-table) processing(spark)
moo-on
2022. 3. 7. 15:28
Download forex rates with Python
import csv
import requests
import json
from airflow.operators.python_operator import PythonOperator
def download_rates():
BASE_URL = "<https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/>"
ENDPOINTS = {
'USD': 'api_forex_exchange_usd.json',
'EUR': 'api_forex_exchange_eur.json'
}
with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
reader = csv.DictReader(forex_currencies, delimiter=';')
for idx, row in enumerate(reader):
base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\\n')
downloading_rates = PythonOperator(
task_id="downloading_rates",
python_callable=download_rates
)
Save the forex rates in HDFS
from airflow.operators.bash import BashOperator
saving_rates = BashOperator(
task_id="saving_rates",
bash_command="""
hdfs dfs -mkdir -p /forex && \\
hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
"""
)
Create to hive table from hdfs
creating_forex_rates_table = HiveOperator(
task_id="creating_forex_rates_table",
hive_cli_conn_id="hive_conn",
hql="""
CREATE EXTERNAL TABLE IF NOT EXISTS forex_rates(
base STRING,
last_update DATE,
eur DOUBLE,
usd DOUBLE,
nzd DOUBLE,
gbp DOUBLE,
jpy DOUBLE,
cad DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
"""
)
---hive_conn---
conn type : hive server 2 Thrift
host : hive-server
login/password : hive
port : 10000
Processing forex rates with spark
---forex_processing.py---
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
warehouse_location = abspath('spark-warehouse')
# Initialize Spark Session
spark = SparkSession \\
.builder \\
.appName("Forex processing") \\
.config("spark.sql.warehouse.dir", warehouse_location) \\
.enableHiveSupport() \\
.getOrCreate()
# Read the file forex_rates.json from the HDFS
df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
# Drop the duplicated rows based on the base and last_update columns
forex_rates = df.select('base', 'last_update', 'rates.eur', 'rates.usd', 'rates.cad', 'rates.gbp', 'rates.jpy', 'rates.nzd') \\
.dropDuplicates(['base', 'last_update']) \\
.fillna(0, subset=['EUR', 'USD', 'JPY', 'CAD', 'GBP', 'NZD'])
# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")
---task---
forex_processing = SparkSubmitOperator(
task_id="forex_processing",
application="/opt/airflow/dags/scripts/forex_processing.py",
conn_id="spark_conn",
verbose = False
)
---add connection---
conn_id = spark_conn
conn_type = spark
host = spark://spark-master
7077