본문 바로가기

Data Engineering/Data Platform

빅데이터를 지탱하는 기술 - Chapter6 빅데이터 분석 기반의 구축

스키마리스 데이터의 애드 혹 분석


주피터와 Spark에 의한 애드 혹 분석의 예시를 설명한다.

분산스토리지는 MongoDB로 JSON 데이터를 가공 집계 시각화한다.

 

스키마리스 데이터 수집

MongoDB에 JSON형식의 데이터를 구축 해놓는다

 

대화식 실행 환경의 준비

주피터 노트북을 가동하여, MongoDB에 있는 데이터를 가져와 데이터 프레임 형식으로 전환한다.

환경을 구축할 때 도커를 통해 가상머신을 구축하면 편하다.

 

Spark에 의한 분산 환경

  • 데이터양의 증가에 따라, Spark를 사용하여 분산 처리한다.
  • pyspark의 실행으로 대화식으로 실행.
  • Spark는 마스터/슬레이브 형의 분산 시스템으로 클라이언트로부터 마스터에 명령을 보냄으로서 프로그램 실행하며, 클라이언트를 드라이버 프로그램이라고 부른다.
  • 드라이버 프로그램은 지시만 할뿐이고, 실제 환경에서는 데이터 센터의 Spark 클러스터에 접속하여서 사용한다.

 

MongoDB의 애드 혹 집계

  • MongoDB에 Spark 세션을 통해 데이터 프레임을 작성한다. 해당 데이터프레임은 지연평가로 DAG만 조립한다.
  • Spark SQL을 통해 데이터 프레임을 SQL로 집계 할 수 있다. MongoDB의 경우 열 지향이 아니기에 고속 집계를 위해서는 한 차례 데이터를 추출 해야한다.
  • 매번 반복하는 것보다, 어느 정도 정리된 상태를 물리적인 테이블로 보관하는 방식으로, 구조화된 데이터를 열지향 스토리지로 저장한다 → 그 후 집계 최적화를 위함

 

데이터를 집계해서 데이터 마트 구축하기

  1. Spark에 ODBC/JDBC 접속
  2. MPP 데이터베이스에 비정규화 테이블 만들기
  3. 데이터 작게 집약하여 CSV파일에 출력하기
    1. 데이터가 충분히 작다면, spark데이터프레임이 아닌 pandas 데이터프레임으로 하는게 과하지 않다.

 

카디널리티의 삭감

word 칼럼 안에 단어 종류가 너무 많다면, 빈출 단어만 골라서 사용하자 → 디멘전 축소로 시각화 등 이점

 

BI 도구로 데이터 시각화하기

애드혹 분석 과정에서 일련의 대화적인 데이터 처리를 노트북 안에서 실행을 하였다.

BI를 도구를 쓰기 전 노트북 안에서 시각화하기 편한 상태로 만들자.

  • 시각화의 결과를 네트워크 경유로 공유 목적이라면 웹형의 BI도구를 고려하자.
  • 항상 최신 정보로 대시보드나 워크플로의 일환으로 자동화 하고 싶을 때도 웹 형이 사용된다

 

Hadoop에 의한 데이터 파이프라인


앞서의 흐름을 Hive와 Presto로 재구축하며, 데이터 처리를 멱등한 태스크로 구현하여 워크플로에 포함할 수 있도록 한다.

 

일일 배치 처리를 태스크화

정기적인 데이터 전송 → 데이터 집계 → 데이터마트의 전형적인 파이프라인 구축

 

  1. 데이터 소스는 Mongo DB에서 Embulk를 이용해 JSON파일 벌크형 전송
    1. Embulk 대체 → MongoDB전용 커맨드 도구 혹은 Hive를 통한 직접 접속
    2. 해당 태스크의 파라미터는 지정된 기간으로, 워크플로 관리 도구에서 설정
    3. 파라미터만 같다면 항상 같은 결과가 덮여씌워지므로 멱등하다
    4. 실제 환경에서는 로컬이 아닌, 별도의 분산 스토리지 전송 혹은 Hive로부터 안정적으로 읽어낸다

 

  1. 장기적인 데이터 분석을 위해 Hive로 열 지향 스토리지 구축 → ORC파일 구축
    1. Hive를 셋업하며 Presto로 집계할 수 있도록 Hive메타 스토어 서비스를 기동한다
    2. JSON파일을 Hive를 사용해 시계열 테이블로 만들기 위해서 파티셔닝을 유효로 한 테이블을 만든다
    3. INSERT OVERWRITE TABLE 문으로 파티션을 덮어씌우면서 멱등한 태스크를 만든다.
    4. 마지막에 ORDER BY로 이벤트 시간으로 정렬하면서 조건절 푸쉬다운 최적화를 만든다

 

  1. Presto를 사용한 집계 → CSV산출
    1. 서버/클라이언트 형의 시스템이다.
    2. Presto 서버를 기동하면, Hive메타 스토어가 동작 중이라면 Hive로 작성한 테이블에 접속할 수 있다.
    3. 데이터 집계는 열 지향 스토리지 구축에 비해 빠르므로 매일 데이터 마트를 치환하는 것이 간단하다
    4. csv파일 만든 후에는 분산 스트로지 전송 혹은 BI 도구로 읽어들인다. 이후 파리미터 교체해서 매일 실행하도록 하면 자동화가 완성된다

 

워크플로 관리 도구에 의한 자동화


앞서 재구축한 흐름을 에어플로우를 이용하여 워크플로의 일부로 실행 시킬 수 있도록 한다.

데이터 파이프라인을 자동화하려면, 오류 발생 시의 복구를 염두에 두고 워크플로를 설계한다

 

Airflow

  • airflow 셋업 시, 서브 디렉토리 생성과 SQLite DB생성이 된다
  • 관리 콘솔이되는 웹 서버 세팅을 해준다. 관리화면은 개발 중에는 필요없고 운용 부터 사용한다

 

 

워크플로의 정의

  • Airflow에 의한 워크플로는 여러 태스크로 이루어진 DAG형태이다.
  • 의존 관계만 정의하면 나머지는 시스템에 의해 자동으로 결정된다.
  • 태스크의 실행 결과는 DB에 기록되고, 실패한 태스크를 재실행시 의존 태스크도 재개된다
  • 각 태스크는 데이터 처리의 대상이 되는 시간이 파라미터이며, 전달된 날짜의 데이터를 처리라도록 한다. 이것이 태스크를 올바로 재실행하기 위한 기초이다.

 

 

태스크의 정의

태스크를 만들기 위한 클래스는 Operator이다. 다양한 플로그인 존재하며, 그 이외에도 표준 Operator로 어떤 작업이든 가능하며, 전달되는 파라미터를 컨텍스트라고 부른다

  • BashOperator - 셸 스크립트 실행
  • PythonOperator - 함수 실행

 

 

워크플로를 터미널로부터 실행하기

airflow를 작성한 스크립트를 실행해도 DAG의 오류 체크가 실행될 뿐이다.

동작시키려면 단말로부터 airflow 명령어를 호출한다.

  • airflow test와 backfill로 인해 충분히 검증하고 실행한다.

 

 

스케줄러를 기동하여 DAG를 정기 실행하기

  • airflow scheduler명령어로 기동
  • DB상태 점검과 실행 가능한 태스크를 찾아 워크 프로세스에 전달하여 실행한다.
  • 오류 태스크 발생 시, 수동으로 클리어 한다면 그상태에서 재실행이 된다.

 

 

태스크가 소비하는 자원 제어하기

  • 동시 실행 가능한 태스크 수는 설정 가능하며, 적절한 태스크 크기를 유지해야 한다
  • 자원 풀 구조
    • 외부 서비스의 API가 오류를 반환하면 동시실행을 늘릴수록 문제가 악화되므로 자원풀을 사용하여 적절히 제어하는 것이 중요하다
  • 작거나 대량의 태스크는 Operator를 사용하여 적절한 태스크의 크기로 모아야한다.

 

 

태스크의 분산처리

Airflow는 데이터 처리가 아닌, 실행 관리를 철저히 해야한다.

시스템 자원을 많이 사용하는 부분을 워크플로의 일부로 실행하고 싶을 때는 Airflow의 워커를 분산할 수 있다. 이 경우 외부의 큐에 등록하여 원격의 워커에서 실행되며, 순서대로 처리해 나가는 타입의 데이터 처리에 적합하다.

 

 

클라우드 서비스에 의한 데이터 파이프라인


클라우드 서비스를 이용한 데이터 파이프라인에 관해 설명한다.