해당 내용은 Datacamp의 Data engineering track을 정리했습니다.
Introduction to data engineering의 chapter 3에 대한 내용입니다.
해당 포스팅에는 아래의 내용을 포함하고 있습니다.
- 텍스트 파일, JSON 파일, Web, Databases에서 데이터 추출하기
- Transforms의 종류, Pyspark에서의 Transform, table join
- 분석용 Database와 어플리케이션용 Database의 차이, MPP Databases, 변환된 데이터 프레임을 SQL 프레임으로 변경하기
- Airflow에서 ETL function 구현하기
1. Extract
데이터를 추출한다는 것은 데이터 처리에 적합하지 않은 영구 저장소의 데이터를 메모리로 추출하는 것을 말합니다. 여기서 말하는 영구저장소는 Amazon S3나 SQL DB를 의미합니다.
데이터 추출은 다양한 데이터 형태로부터 가능합니다. 텍스트 형태의 비정형 데이터도 가능하고, row, column으로 이뤄진 tsv나 csv파일로부터도 추출이 가능합니다. 또한 key-value 형태로 이뤄진 반정형 데이터 JSON 파일도 활용 가능합니다. JSON에서 활용할 수 있는 데이터 형태는 number, string, boolean, null 외에도 array와 object도 가능합니다. 최근에는 많은 웹사이트가 JSON 형식으로 데이터를 전달하고 있습니다.
import json
result = json.loads('{"key_1": "value_1", "key_2": "value_2"}')
print(result["key_1"]) #value_1
추가적으로 웹에서 데이터를 가져오기 위해서 request로 가져오게 됩니다. 예를 들어, 구글을 통해 애플에 대한 정보를 얻고 싶다고 가정하면, 구글 검색창에서 애플을 치면, 해당 조건에 맞은 Request가 Google server로 보내지고, 보내진 Request에 맞는 웹페이지를 보내줍니다. 이 과정에서 JSON 형태의 데이터를 받게 됩니다. 일부 웹 서버의 경우 사람이 읽을 수 있는 웹페이지를 제공하지 않기도 합니다.
아래는 Twitter와 Hackernews API로 요청했을 때, 얻을 수 있는 데이터 형태입니다.
# Twitter API 데이터
{ "statuses": [{ "created_at": "Mon May 06 20:01:29 +0000 2019", "text": "this is a tweet"}]}
# Hackernews API
import requests
response = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
print(response.json())
#{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': ...}
가장 일반적으로 데이터 추출하는 방법은 데이터베이스에서 추출하는 것입니다. 웹 서비스와 같은 대부분의 애플리케이션은 백업하고 데이터를 유지하기 위해 데이터베이스가 필요합니다. 애플리케이션 데이터베이스와 분석 데이터베이스를 구별하는 것이 중요합니다. 먼저, 애플리케이션 데이터베이스는 많은 트랙잭션을 갖도록 최적화되어 있습니다. 트랜잭션은 일반적으로 데이터베이스에서 행 또는 레코드를 변경하거나 삽입하는 것을 말합니다. 이러한 데이터베이스를 OLTP라고 부르기도 합니다. 또한 데이터가 행 단위로 추가되기 때문에 행 지향적인 데이터베이스입니다. 반면, 분석 데이터베이스의 경우 분석에 최적화되며, OLAP라고 합니다. 이것은 열 지향적인 데이터베이스입니다. 보통 데이터베이스로부터 데이터를 추출할 때에는 URI이 필요합니다. URI는 데이터베이스에 연결하는 방법에 대한 정보를 보유하는 문자열을 말합니다. 아래와 같은 형태를 갖는 것이 일반적입니다.
# URI 형태 - postgresql://user_name:password@host:port/
# Use in Python
import sqlalchemy
connection_uri = "postgresql://repl:password@localhost:5432/pagila"
db_engine = sqlalchemy.create_engine(connection_uri)
import pandas as pd
pd.read_sql("SELECT * FROM customer", db_engine)
Fetch from an API Source code
import requests
# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
# Print the response parsed as JSON
print(resp.json())
# {'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)
# 17
Read from a database Source code
# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
query = "SELECT * FROM {}".format(tablename)
return pd.read_sql(query, db_engine)
# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila"
db_engine = sqlalchemy.create_engine(connection_uri)
# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)
# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("customer", db_engine)
2. Transform
transformations의 종류에는 5가지가 존재합니다. 첫 번째는 특정 속성을 선택합니다. 예를 들어, '이메일' 열만 선택할 수 있습니다. 두 번째, 특정 코드값을 바꿀 수 있습니다. 예를 들어, New York을 NY로 표현하는 것을 말할 수 있습니다. 세 번째는 데이터 검증입니다. 만약, created_at에 값이 존재하지 않는다면, 해당 레코드는 존재하지 않았다고 판단할 수 있습니다. 네 번째, 여러 개의 칼럼들을 나눌 수 있습니다. 다섯 번째, 다양한 소스로부터 합칠 수 있습니다.
customer_df # Pandas DataFrame with customer data
# Split email column into 2 columns on the '@' symbol
split_email = customer_df.email.str.split("@", expand=True)
# Create 2 new columns using the resulting DataFrame
customer_df = customer_df.assign(username=split_email[0], domain=split_email[1])
DataFrame 형태의 customer_df가 있다고 가정했을 때, split을 통해서 '@'를 기준으로 jane.doe인 user name과 theweb.com이라는 도메인 이름으로 나눌 수 있습니다. 여기서 expand=True는 split 했을 때, split_email에 인덱스로 다르게 split 하는 방법으로 활용되었습니다. 그렇게 나눠진 columns을 추가하기 위해 assign 함수를 활용할 수 있습니다.
위에서의 다양한 Transforming을 Pyspark에서 어떻게 할 수 있는지 알아보려고 합니다. 먼저, Pyspark에서 데이터를 추출해야 합니다. 보통 데이터를 추출할 때 데이터가 적으면 Pyspark말고 pandas를 활용할 수 있습니다.
import pyspark.sql
spark = pyspark.sql.SparkSession.builder.getOrCreate()
#jdbc : spark가 여러 관계형 데이터베이스에 연결하는데 도움을 주는 소프트웨어
spark.read.jdbc("jdbc:postqresql://localhost:5432/pagila", "customer", properties={"user":"repl", "password":"password"})
read.jdbc의 인자 중 첫 번째는 uri와 굉장히 유사하지만 user 이름과 비밀번호는 properties라는 인자로 입력해주고 있습니다. 또한 "customer"는 연결하고 싶은 Table명을 작성하시면 됩니다.
마지막으로 Pyspark에서 table join하는 코드는 아래와 같습니다.
customer_df # PySpark DataFrame with customer data
ratings_df # PySpark DataFrame with ratings data
# Groupby ratings
ratings_per_customer = ratings_df.groupBy("customer_id").mean("rating")
# Join on customer ID
customer_df.join(ratings_per_customer, customer_df.customer_id == ratings_per_customer.customer_id)
3. Loading
대규모 병렬 처리 데이터베이스(MPP Databases)는 분산 방식으로 실행되는 분석에 최적화된 columns oriented 데이터베이스입니다. 쿼리는 단일 컴퓨팅 노드이지만 하위 작업으로 분할되고 여러 노드로 분산됩니다. 대표적으로 Amazon Redshift, Azure SQL Data Warehouse, Google BigQuery가 있습니다.
Redshift에서 Load하는 예시 코드는 아래와 같습니다. 여기서 csv 파일은 좋은 형식은 아닙니다. parquet은 Apache Hadoop 에코 시스템의 무료 오픈 소스 열 지향 데이터 스토리지 형식입니다.
# Pandas .to_parquet() method
df.to_parquet("./s3://path/to/bucket/customer.parquet")
# Pyspark .write.parquet() method
df.write.parquet("./s3://path/to/bucket/customer.parquet")
다음은 변환 단계의 결과를 PostgreSQL 데이터베이스로 로드하는 코드입니다.
# Transformation on data
recommendations = transform_find_recommendations(ratings_df)
# Load into PostgreSQL database
recommendations.to_sql("recommendations", db_engine, schema="store", if_exists="replace")
4. Putting it all together
앞에서 배운 ETL을 모두 결합해서 최종적으로 ETL을 수행하는 함수를 만들어보고, Airflow를 활용해서 DAG로 스케줄링까지 해봅니다. 코드는 아래와 같습니다.
#1. ETL Function
def extract_table_to_df(tablename, db_engine):
return pd.read_sql("SELECT * FROM {}".format(tablename), db_engine)
def split_columns_transform(df, column, pat, suffixes):
# Converts column into str and splits it on pat...
def load_df_into_dwh(film_df, tablename, schema, db_engine):
return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace")
db_engines = { ... }
def etl():
# Extract
film_df = extract_table_to_df("film", db_engines["store"])
# Transform
film_df = split_columns_transform(film_df, "rental_rate", ".", ["_dollar", "_cents"])
# Load
load_df_into_dwh(film_df, "film", "store", db_engines["dwh"])
이전의 강의에서 DAGs를 활용한 Scheduling 할 때, schedule_interval을 보셨을 겁니다. schedule_interval에는 cron에서 활용하는 시간을 표시하는 방식입니다. "0 * * * *"에서 맨 앞에서부터 분, 시, 일, 월, 일주일 중 어떤 날인지에 대한 내용을 담을 수 있습니다. 위의 예제의 경우에는 매시간 0분일 때마다 실행하도록 하는 dag를 생성할 수 있습니다.
# 2. The DAG definition file
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(dag_id="etl_pipeline", schedule_interval="0 0 * * *")
etl_task = PythonOperator(task_id="etl_task", python_callable=etl, dag=dag)
etl_task.set_upstream(wait_for_this_task)
# wait_for_this_task.set_downstream(etl_task)와 동일