해당 내용은 Datacamp의 Data engineering track을 정리했습니다.
Introduction to data engineering의 chapter 2에 대한 내용입니다.
해당 포스팅에는 아래의 내용을 포함하고 있습니다.
- 데이터베이스, SQL과 NoSQL, schema
- 병렬 처리란 무엇인가?
- 병렬 처리를 위한 프레임 워크
1. Databases
앞에서 많은 강의에서 데이터베이스에 대한 내용을 다뤘습니다. 한번 더 정리하면, 데이터베이스는 데이터 엔지니어에겐 필수적인 데이터 관리 도구입니다. 데이터베이스의 기본적인 뜻은 빠른 검색을 위해 구성된 데이터 모음이라고 표현할 수 있습니다. 많은 데이터를 가지고 있으며, DBMS를 통해 검색할 수 있습니다. 파일 시스템과는 규모적인 측면이나 업무를 하기 위한 방법들이 다릅니다. 데이터베이스를 이해하기 위해서는 데이터 종류(정형, 반정형, 비정형)에 대한 내용을 알아야 하는데 해당 내용은 이전 강의를 참고하시면 좋을 것 같습니다. 필요하시다면 여기를 클릭하세요.
강의에서는 SQL과 NoSQL을 아래와 같이 비교하고 있습니다.
SQL | NoSQL |
관계형 데이터베이스 | 관계성이 없는 데이터베이스 |
테이블 관계를 정의하는 스키마 | 구조화 되어 있지 않은 경우 존재 |
Table 형태의 데이터 | key:value 형태의 데이터, Document DB(json) |
MySQL, PostgreSQL | redis, MongoDB |
SQL을 통해서 테이블을 만들어서 두 개의 테이블을 foreign key를 활용해서 join을 해보려면 아래와 같은 코드로 구현할 수 있습니다. 이렇게 join을 하게 되면 foreign key(참조 키)를 통해서 두 개의 테이블을 한 개의 테이블로 확인할 수 있습니다.
# Create Customer Table
CREATE TABLE "Customer" (
"id" SERIAL NOT NULL,
"first_name" varchar,
"last_name" varchar,
PRIMARY KEY ("id")
);
# Create Order Table
CREATE TABLE "Order" (
"id" SERIAL NOT NULL,
"customer_id" integer REFERENCES "Customer",
"product_name" varchar,
"product_price" integer,
PRIMARY KEY ("id")
);
# join both tables on foreign key
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "customer_id" = "Customer"."id";
스키마 중에는 스타 스키마가 존재하는 데, 스타 스키마는 다차원의 데이터를 표현하기 위한 데이터베이스 설계기법입니다. 테이블은 Fact와 Dimension으로 구성되고, Fact 테이블을 중심으로 Dimension 테이블이 뻗어져 나오는 형태를 가지고 있습니다. Fact는 주로 일어난 일(ex. 상품 구매)에 대해 작성하고, Dimension에는 Fact에서 발생한 일에 대해 부수적인 정보(ex. 구매자 정보, 판매점 정보, 상품 정보)들을 포함합니다.
2. What is parallel computing ?
데이터 엔지니어는 기본적으로 데이터를 가져와서 정리하고 결합하거나 집계하는 역할을 수행합니다. 이번 강의에서는 대용량 데이터를 다루기 위한 병렬 처리에 대해서 언급하고 있습니다.
병렬 처리는 메모리와 컴퓨팅 처리 속도로 인해 중요해지고 있습니다. 대용량의 데이터는 많아지고, 이에 따라 1개의 컴퓨터로는 감당하기 어려운 수준이기 때문에, 이를 다양한 컴퓨터를 활용해서 분산하면 컴퓨터 당 메모리 수는 줄어들면서 많은 컴퓨팅 자원을 활용할 수 있습니다.
병렬 처리는 3가지 단계로 구성되는 데, 하나의 작업을 여러 개의 작업들로 나눠줍니다. 나눠진 작업들을 여러 개의 컴퓨터로 전달해 작업을 진행합니다. 종료된 작업은 하나로 모아줍니다.
병렬 컴퓨팅을 적용했을 때는 1개의 컴퓨터를 활용했을 때보다 높은 처리 능력을 가지게 됩니다. 또한 데이터를 나누기 때문에 컴퓨터 당 차지하는 메모리 수가 감소하게 된다는 장점이 있습니다. 하지만, 병렬 컴퓨팅은 장점만 존재하지는 않습니다. 기본적으로 데이터를 보내주고받는 과정이 추가되기 때문에 그 사이에 발생하는 통신 비용이 발생하게 됩니다. Task가 크지 않거나 처리 장치가 적은 경우에는 병렬 컴퓨팅을 적용하지 않는 것이 좋습니다.
강의에서는 추가적으로 활용 가능한 라이브러리를 소개하고 있습니다. 대표적으로 multiprocessing.Pool과 dask라는 라이브러리를 활용할 수 있습니다. 사용 방법은 아래와 같습니다.
# multiprocessing.Pool
from multiprocessing import Pool
def take_mean_age(year_and_group):
year, group = year_and_group
return pd.DataFrame({"Age": group["Age"].mean()}, index=[year])
# 4개로 나눠서 작업을 진행
with Pool(4) as p:
results = p.map(take_mean_age, athlete_events.groupby("Year"))
# 나눠서 진행한 작업을 합치기
result_df = pd.concat(results)
# dask
import dask.dataframe as dd
# 데이터프레임을 4개로 나눕니다.
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)
# 각 파티션 별로 병렬 연산을 수행합니다.
result_df = athlete_events_dask.groupby('Year').Age.mean().compute()
multiprocessing.Pool은 low level에서 활용 가능한 라이브러리입니다. dask는 multiprocessing에 비해 조금 더 high level의 라이브러리입니다.
3. Parallel computation frameworks
이번 강의에서는 병렬 처리를 위한 프레임워크를 소개합니다.
하둡은 Apache Software Foundation에서 유지 관리하는 오픈 소스 프로젝트 모음입니다. 이 강의에서는 하둡 프로젝트인 MapReduce와 HDFS에 대해 다룹니다.
HDFS는 기존의 컴퓨터 파일 시스템과 굉장히 유사합니다. 다만, 다른 점이 있다면 파일이 여러 개의 컴퓨터로 저장되어 있습니다. 빅 데이터가 등장하면서 병렬 컴퓨팅에 HDFS는 필수적인 요소였습니다. 최근에는 Amazon S3 같은 클라우드 관리 저장 시스템이 HDFS를 대체하기도 합니다.
MapReduce는 위에서 말했던 병렬 처리와 동일하게 하위 작업으로 나누고 여러 개의 컴퓨터로 분배해주는 역할을 합니다. 위에 우측에 보이는 것처럼 큰 데이터를 4개로 split 해서 여러 가지의 컴퓨터로 전달합니다. MapReduce는 작업을 작성하기가 어렵다는 단점이 있었습니다. 이를 해결한 프레임워크가 Hive입니다.
Hive는 데이터를 생성하는 Hadoop ecosystem(?)의 최상위 계층입니다. Hive는 Hadoop에서 실행되며, 변형된 SQL을 활용합니다. 초기에는 MapReduce가 담당했으나, 최근에는 여러 데이터 처리 도구와 통합되었습니다.
Hive 말고 Spark를 중점적으로 다루려고 합니다. Spark는 디스크에 저장하기보다는 최대한 메모리를 활용하는 방식을 사용합니다. Spark는 MapReduce에서 탐색적 데이터 분석에서의 한계를 개선했습니다.
Spark의 구조는 Resilient distributed datasets(RDD)에 의존합니다. RDD는 여러 노드 간에 분산된 데이터를 유지 관리하는 구조입니다. RDD는 튜플의 리스트 형태를 가지고 있으며, 변환할 때에는 .map()이나 .filter()를 활용할 수 있고, 동작하기 위해서는 .count(), .first()를 사용합니다. 변환은 변환된 RDD를 생성하고, 동작은 단일 결과를 생성합니다. (여기서 동작은 mean()과 같은 작업을 말하는 것이 아닐까라고 추측해봅니다.)
Pyspark는 Spark를 파이썬으로 활용할 수 있는 라이브러리입니다. DataFrame 추상화를 지원하고 있어서 pandas와 매우 유사합니다.
아래는 Pyspark를 활용한 예제입니다.
# /home/repl/spark-script.py 파일
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
athlete_events_spark = spark.read.csv("/home/repl/datasets/athlete_events.csv", header=True, inferSchema=True, escape='"')
athlete_events_spark = athlete_events_spark.withColumn("Height", athlete_events_spark.Height.cast("integer"))
print(athlete_events.spark.groupBy('Year').mean('Height').orderBy('Year').show())
4. Workflow scheduling frameworks
이번 강의에서는 워크플로우 스케줄링과 워크플로우를 스케줄링하는 프레임워크인 Apache Airflow를 소개합니다.
일반적으로 csv파일이 들어오면 Spark를 통해서 작업을 진행하여 분석하기 위한 데이터베이스로 보내주는 작업을 진행합니다. 하지만 csv파일이 계속 업데이트된다고 했을 때, 어떻게 이것을 관리할 수 있을까요? 일반적으로 수동으로 직접 실행할 수 있습니다. 하지만, 주말의 경우에도 출근해서 수동으로 해주면 너무 힘들겠죠.
예를 들어, 작업 A, B, C가 존재합니다. 작업 간에는 순서가 존재합니다. A라는 작업이 진행된 상태에서 B라는 작업을 진행할 수 있다면, 이러한 부분들은 어떻게 스케줄링할 수 있을까요? Linux의 cron과 같은 도구로는 충분하지 않을 수 있습니다. 이러한 종속성을 표현하는 방법을 DAG라고 합니다.
DAG는 방향성 비순환 그래프로 방향이 지정된 edge로 노드들을 연결한 노드 집합입니다. 여기에는 그래프 Cycle은 존재하지 않습니다. Cycle이 존재하지 않는다는 것은 특정 노드를 두 번 이상 볼 수 없다는 것을 의미합니다.
위의 그림에서는 Job A에서 시작하여 Job B - Job C/Job D - None/Job E로 이뤄진 DAG를 가집니다. 반드시, 앞의 노드의 작업이 실행되어야 다음 작업이 진행할 수 있습니다. 이러한 작업들을 할 수 있는 툴에는 Linux의 cron과 Spotify의 Luigi, Apache Airflow가 존재합니다.
이 중 Apache Airflow는 Airbnb에서 만들었으며, python 기반으로 만들어져 있고, DAG를 활용하고 있습니다. 아래는 새로운 DAG 예시입니다. 이 예시를 Apache Airflow에서는 어떻게 구현하는지 알아봅시다.
일단 DAG를 먼저 해석해보면, 클러스터가 시작하면, ingest_customer_data와 ingest_product_data가 각각 작업이 진행됩니다. 그 후 두 개의 작업이 모두 끝나면 enrich_customer_data를 진행할 수 있습니다.
#Create the DAG object
dag = DAG(dag_id="example_dag", ..., schedule_interval="0 * * * *")
#Define operations
start_cluster = StartClusterOperator(task_id="start_cluster", dag=dag)
ingest_customer_data = SparkJobOperator(task_id="ingest_customer_data", dag=dag)
ingest_product_data = SparkJobOperator(task_id="ingest_product_data", dag=dag)
enrich_customer_data = PythonOperator(task_id="enrich_customer_data", ..., dag=dag)
# Set up dependency flow
start_cluster.set_downstream(ingest_customer_data)
ingest_customer_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)
아래는 practice에서 나오는 문제에 대한 코드입니다.
# Airflow DAGs
# Create the DAG object
# assemble_frame - price_tires/assemble_body - none/apply_paint
# 매시간 0분일 때 실행하기 원함.
dag = DAG(dag_id="car_factory_simulation", default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)}, schedule_interval="0 * * * *")
# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Assembling body"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)
# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)