본문 바로가기
공부/프로그래밍

[airflow] Mysql 데이터를 GCS(Google Cloud Storage)로 저장하기(mysql_to_gcs, 한글깨짐, 날짜포멧 수정)

by demonic_ 2020. 8. 27.
반응형

Data lake 를 구축하기 위한 1단계인 원본데이터를 GCS로 이동하는 것을 다뤄보려 한다.

 

embulk를 쓸까도 했었는데, JAVA 1.8 버전 이후부터는 지원을 안하기도 했고, 관리포인트를 늘리는 것도 좋아보이지 않아 찾아보다가 airflow에서 자체적으로 할 수 있는것을 확인해 이걸로 하기로 결정했다.

 

아래 github을 들어가면 mysql_to_gcs.py파일이 있는데, 이걸 이용하면 mysql에 질의한 쿼리대로 나온 결과를 GCS로 저장이 가능하다.

https://github.com/apache/airflow/blob/1e79dae06e/airflow/contrib/operators/mysql_to_gcs.py

 

apache/airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - apache/airflow

github.com

 

 

참고로 파이썬은 3.x 버전을 사용한다.

시작.

 

 

다음 데이터를 추출하여 GCS에 저장할 예정이다.

 

output 유형은 json으로 나오는데 다음과 같다.

{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": 1598304038.0}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": 1598304338.0}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": 1598304638.0}

 

json이긴하나 row별로 나눠져 있는데, 아래에서도 다룰거니 그때 자세히 살펴보자.

 

 

mysql_to_gcs 파일을 보고 사용할 파라미터를 알 수 있다.

우선 airflow DAG를 생성하기 위한 코드를 작성해보자.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator

default_args = {
    'owner': 'airflow'
    , 'depends_on_past': False
    , 'start_date': datetime(2020, 8, 24)
    , 'email': ['이메일주소']
    , 'email_on_faiure': False
    , 'email_on_retry': False
    , 'retries': 1
    , 'retry_delay': timedelta(minutes=5)
}

...

 

DAG를 수행하다가 실패하면 등록한 이메일로 전송해주는거 같다. 다만 이것도 별도 설정하지 않으면 다음과 같은 에러를 보여준다. 추후 기회가되면 메일을 등록하는 것도 포스팅해보겠다.

WARNING - section/key [smtp/smtp_user] not found in config
WARNING - section/key [smtp/smtp_user] not found in config
ERROR - Failed to send email to: ['이메일주소']
ERROR - [Errno 61] Connection refused
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
    encryption_configuration=self.encryption_configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1297, in run_load
    return self.run_with_configuration(configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
    .execute(num_retries=self.num_retries)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 901, in execute
    headers=self.headers,
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 159, in _retry_request
    for retry_num in range(num_retries + 1):
TypeError: can only concatenate str (not "int") to str

 

계속 코드를 추가해보자.

...

dag = DAG('mysql_to_gcs_dag', default_args=default_args)

# 쿼리작성
sql="select ID, NAME, AGE from member "; 
# bucket 이름
bucket='test-member-20200824'
# filename(폴더포함)
filename="test/member.json"

mysqlToGCS = MySqlToGoogleCloudStorageOperator(
	task_id='mysql_to_gcs'
	, dag=dag
	, sql=sql
	, bucket=bucket
	, filename=filename
	)

 

 

이정도만 등록하면 실행하는데 설정은 모두 다한 샘이다.

그럼 Mysql 접속정보와 GCS접근 권한을 위한 설정은 어떻게 해야할까?(사실 이부분을 가장 애먹었다....)

 

mysql_to_gcs.py 파일을 보면 다음과 같은 부분이 있다.

...
    @apply_defaults
    def __init__(self,
                 sql,
                 bucket,
                 filename,
                 schema_filename=None,
                 approx_max_file_size_bytes=1900000000,
                 mysql_conn_id='mysql_default',    # 이부분
                 google_cloud_storage_conn_id='google_cloud_default',
                 schema=None,
                 delegate_to=None,
                 *args,
                 **kwargs):
...

이것을 설정하는 방법을 이제 설명하려 한다.

 

 

# Mysql Connection 추가하기(접속정보 설정)

airflow는 접속정보를 파일로 관리하지 않는다. 대신 리눅스의 환경변수로 정보를 입력받는다.

우선 공식문서를 확인해보자

https://airflow.apache.org/docs/stable/howto/connection/mysql.html

 

MySQL Connection — Airflow Documentation

 

airflow.apache.org

문서 내 For example을 확인하면 다음처럼 설정을 하라고 나온다.

export AIRFLOW_CONN_MYSQL_DEFAULT='mysql://mysql_user:XXXXXXXXXXXX@1.1.1.1:3306/mysqldb?ssl=%7B%22cert%22%3A+%22%2Ftmp%2Fclient-cert.pem%22%2C+%22ca%22%3A+%22%2Ftmp%2Fserver-ca.pem%22%2C+%22key%22%3A+%22%2Ftmp%2Fclient-key.pem%22%7D'

 

위의 내용을 다음과 같이 치환하면 된다

export AIRFLOW_CONN_MYSQL_DEFAULT='mysql://[username]:[password]@[host]:[port]/[database]

 

참고로 비밀번호에 특수문자가 들어가는 경우 종종 다음과 같은 에러가 발생할 수 있다.

(Url 인코더에 의미있게 쓰이는 특수문자일 경우 그렇다. => 비밀번호를 'sample12#$' 로 할 경우)

invalid literal for int() with base 10: "'sample12"
Traceback (most recent call last):

 

이럴땐 특수문자인 '#$'을 URL-encoded 로 변경하여 등록해야 한다. 따라서 다음처럼 등록해야 한다.

---

export AIRFLOW_CONN_MYSQL_DEFAULT='mysql://[username]:sample12%23%24@[host]:[port]/[database]

---

 

URL-encoded로 변환하는 건 인터넷에 많으니 찾아보거나 혹은 아래사이트에서 변환 가능하다

https://www.urlencoder.org/

 

URL Encode and Decode - Online

Encode to or Decode from URL encoded (also known as Percent-encoded) format with advanced options. Enter our site for an easy-to-use online tool.

www.urlencoder.org

 

그리고 환경변수 이름에서 AIRFLOW_CONN_ 을 뺀 나머지가 바로 위에서 본 'mysql_default'로 대조되는 것이다. 만약 이름을 바꿀꺼라면 AIRFLOW_CONN_[변경할 이름] 으로 지정한 뒤에, 위의 설정에서 추가해주어야 한다.

mysqlToGCS = MySqlToGoogleCloudStorageOperator(
	task_id='membership_to_gcs'
	, dag=dag
	, sql=sql
	, bucket=bucket
	, filename=filename
    , mysql_conn_id='[설정한 이름]'
	)

 

 

# GCS 설정하기

GCS는 구글 클라우드 콘솔에 접속하여 설정해야 하기 때문에 해야할 것이 많다.

우선 서비스계정 생성과 권한을 추가한다.

 

GCP 콘솔 => IAM 및 관리자 => 서비스계정 클릭

 

 

상단에 서비스계정 만들기를 클릭한다.

입력창에 필요한 정보를 입력한다.

 

엑세스 권한 부여는 GCS 권한, 그리고 추후 BigQuery 권한을 같이 넣는다.

보안에 신경써야 한다면 BigQuery쪽은 계정을 따로 생성해도 된다. 나중에 진행할 GcsToBigQuery를 할때 각각 계정으로 설정이 가능하다.

 

만약 기존 계정에다 서비스계정 권한을 추가해야 한다면 이 메뉴가 아닌

GCP 콘솔 => IAM 및 관리자 => IAM 으로 접속하면 권한을 수정할 수 있다.

 

마지막 화면은 별다른 설정없이 완료를 누른다.

 

그럼 이제 생성된 계정에 키를 다운받아야 한다. 우측의 아이콘을 클릭하면 키 만들기 가 있다.

 

json으로 다운로드 받는다.

 

파일을 열어보면 키정보가 담겨있다.

 

그럼 이번엔 버킷을 만들어보자

위에 설정한대로 test-member-20200824 이름의 버킷을 만든다.

 

Storage => 브라우저 로 들어가 '버킷 생성' 버튼을 클릭한다.

버킷이름은 지정한대로 하고 다음을 누른다.

 

 

위치는 아시아지역으로 했다.(상황에 따라 맞춰서 하면 될듯)

위치를 Region(단일 리전)선택 후 서울을 선택하길 추천한다.

나중에 BigQuery에서 데이터세트를 만들때 데이터 위치를 서울(asia-northeast3)으로 지정해 만들 경우, gcs_to_bq 를 실행할때 지역이 다르다고 실행이 안되는 문제를 발견했다.

 

아래는 그 에러문구(source 부분에 asia라고 되어있는게 아시아지역을 의미하는데 안되는건 버그인듯 싶다)

BigQuery job failed. Final error was: {'reason': 'invalid', 'message': 'Cannot read and write in different locations: source: asia, destination: asia-northeast3'}. The job was: {'kind': 'bigquery#job', 'etag': 'AxCC5WIAsurvnCOCAfJ4fQ==', 'id': 'test-project:asia-northeast3.job_ruRurG1ezUpDAgijkMCbPPfYaiRR', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/test-project/jobs/job_ruRurG1ezUpDAgijkMCbPPfYaiRR?location=asia-northeast3', 'user_email': 'data-storage-access@test-project.iam.gserviceaccount.com', 'configuration': {'load': {'sourceUris': ['gs://test-member-20200824/membership/customer_20200825.json'], 'destinationTable': {'projectId': 'test-project', 'datasetId': 'membership', 'tableId': 'tb_customer'}, 'createDisposition': 'CREATE_IF_NEEDED', 'writeDisposition': 'WRITE_TRUNCATE', 'sourceFormat': 'NEWLINE_DELIMITED_JSON', 'ignoreUnknownValues': False, 'autodetect': True}, 'jobType': 'LOAD'}, 'jobReference': {'projectId': 'test-project', 'jobId': 'job_ruRurG1ezUpDAgijkMCbPPfYaiRR', 'location': 'asia-northeast3'}, 'statistics': {'creationTime': '1598349389988', 'startTime': '1598349390121', 'endTime': '1598349390121'}, 'status': {'errorResult': {'reason': 'invalid', 'message': 'Cannot read and write in different locations: source: asia, destination: asia-northeast3'}, 'errors': [{'reason': 'invalid', 'message': 'Cannot read and write in different locations: source: asia, destination: asia-northeast3'}], 'state': 'DONE'}}

 

스토리지는 기본으로 선택, 그 외에 다른 설정도 기본값으로 두고 '만들기'를 누른다.

 

권한을 클릭하여 내가 만든 계정이 보이는지 확인한다.

 

그럼 이제 환경설정으로 접속권한을 설정하자.

 

 

# GCS Connection 설정하기

우선 airflow 의 공식문서를 참조해보자.

https://airflow.apache.org/docs/stable/howto/connection/gcp.html

 

Google Cloud Platform Connection — Airflow Documentation

 

airflow.apache.org

 

 

 

문서에 보면 다음 유형들을 인증설정 할 수 있다.

이중 여기서 필요한 것은 google_cloud_default 인데 그 안에 GoogleCloudStorageHook이 있기 때문이다.

 

문서 하단에 For example을 이용해 이전에 받은 생성키 파일을 연결해보자

export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5'

 

 

이중에 수정해야 할 것은 2개다

1) extra__google_cloud_platform__project

2) extra__google_cloud_platform__key_path

 

그리고 나중에 bigquery를 연결하기 위해선extra__google_cloud_platform__num_retries 를 제거해주는게 좋다. 다음과 같은 에러가 나는데 관련 포스팅 시 다시한번 설명하겠다.

ERROR - can only concatenate str (not "int") to str
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
    encryption_configuration=self.encryption_configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1297, in run_load
    return self.run_with_configuration(configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
    .execute(num_retries=self.num_retries)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 901, in execute
    headers=self.headers,
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 159, in _retry_request
    for retry_num in range(num_retries + 1):
TypeError: can only concatenate str (not "int") to str

 

프로젝트 이름은 콘솔 프로젝트 이름을 넣어주면 된다.

그리고 key_path는 다운로드 받은 경로에 FULL로 넣어주어야 한다

설정이 다 끝나면 환경변수에 등록한다.

 

 

# airflow test 실행하기

test를 실행하여 에러가 나는건 없는지 미리 확인해보자

airflow test mysql_to_gcs_dag mysql_to_gcs 2020-08-24

 

실행하다가 다음 에러가 난다면 위에서 말한 mysql 접속환경 변수를 등록하지 않은것이니 등록해주도록 하자.

ERROR - (2005, "Unknown MySQL server host 'mysql' (0)")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/sql_to_gcs.py", line 115, in execute
    cursor = self.query()
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/mysql_to_gcs.py", line 85, in query
    conn = mysql.get_conn()
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/mysql_hook.py", line 116, in get_conn
    conn = MySQLdb.connect(**conn_config)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/__init__.py", line 84, in Connect
    return Connection(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 166, in __init__
    super(Connection, self).__init__(*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2005, "Unknown MySQL server host 'mysql' (0)")
[2020-08-24 22:29:10,207] {taskinstance.py:1187} INFO - All retries failed; marking task as FAILED.dag_id=mysql_to_gcs_dag, task_id=mysql_to_gcs, execution_date=20200824T000000, start_date=20200824T132905, end_date=20200824T132910
[2020-08-24 22:29:10,232] {configuration.py:320} WARNING - section/key [smtp/smtp_user] not found in config
[2020-08-24 22:29:10,233] {configuration.py:320} WARNING - section/key [smtp/smtp_user] not found in config
[2020-08-24 22:29:10,234] {taskinstance.py:1206} ERROR - Failed to send email to: ['등록된 이메일']
[2020-08-24 22:29:10,234] {taskinstance.py:1207} ERROR - [Errno 61] Connection refused

 

실행하면 다음과 같은 메세지와 함께 완료되었다고 나온다.

{taskinstance.py:900} INFO - Executing <Task(MySqlToGoogleCloudStorageOperator): mysql_to_gcs> on 2020-08-24T00:00:00+09:00
{base_hook.py:87} INFO - Using connection to: id: mysql_default. Host: [host], Port: [port], Schema: [schema], Login: [username], Password: XXXXXXXX, extra: None
{mysql_to_gcs.py:92} INFO - Executing: select ID, NAME, AGE from member 
{gcs_hook.py:224} INFO - File /var/folders/qc/h1p2zsqj2dl1q9z709z9bsp00000gn/T/tmpjfvil710 uploaded to test/member.json in <Bucket: test-member-20200824> bucket
{taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=mysql_to_gcs_dag, task_id=mysql_to_gcs, execution_date=20200824T000000, start_date=20200824T134148, end_date=20200824T134153

 

실제로 저장되었는지 확인해보자

 

test 폴더 안에 member.json 파일로 저장된 것을 확인할 수 있다.

파일을 다운로드 받아서 열어보면,

{"AGE": 25, "ID": 1, "NAME": "\uc544\uc774\uc720", "REG_DT": 1598304038.0}
{"AGE": 33, "ID": 2, "NAME": "\uae40\uc218\ud604", "REG_DT": 1598304338.0}
{"AGE": 18, "ID": 3, "NAME": "\uc774\uc218\uc601", "REG_DT": 1598304638.0}

 

제대로 복사된 것을 확인할 수 있다. 다만 2가지 문제가 있다.

1) 한글이 깨진다는 것

2) 날짜가 timestamp로 변경된다는 점

 

이것을 해결하기 위해서는 airflow에서 제공한 mysql_to_gcs.py 파일을 다운로드 받아 커스터마이징 해야한다.

 

 

 

# mysql_to_gcs.py 파일 수정

해당 문제를 수정하는데 아래 사이트의 도움을 받았다. 아래 링크

https://chang12.github.io/airflow-postgres-to-s3/

 

 

시작해보자.

 

파일을 다운받아 파이썬으로 import 할 수 있게 하고, 파일 내용 중 다음을 수정한다.

 

 

파일 다운로드 주소:

https://github.com/apache/airflow/blob/1e79dae06e/airflow/contrib/operators/mysql_to_gcs.py

 

 

1) 한글문제 수정

환경변수에 접속정보 등록 시 charset을 지정하면 된다.

export AIRFLOW_CONN_MYSQL_TEST='mysql://[username]:[password]@[hostname]:[port]/[schema]?charset=utf8'

마지막에 설정에 charset=utf8를 추가.

 

 

아래는 더이상 사용하지 않음

다음 위치를 찾아 주석으로 변경한 후 수정한다.

...
    def _write_local_data_files(self, cursor):
        ...
        # tmp_file_handle = NamedTemporaryFile(delete=True) << 주석
        # 변경
        tmp_file_handle = NamedTemporaryFile(mode='w', encoding='utf-8', delete=True)
        tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}

        for row in cursor:
            ...
            # s = json.dumps(row_dict)  << 주석
            # if PY3:  << 주석
                # s = s.encode('utf-8')  << 주석
            # tmp_file_handle.write(s)  << 주석
            # tmp_file_handle.write(b'\n')  << 주석
            # 변경
            tmp_file_handle.write(json.dumps(row_dict, ensure_ascii=False, sort_keys=True))
            tmp_file_handle.write('\n')
            ...
...

 

2) 날짜를 String foramt(yyyy-mm-dd hh:mm:ss)로 변형

반드시 위와같은 포멧을 지켜야하는게 아니라면 굳이 설정하지 말고 추후 bigquery에 넣을때 timestamp 로 타입지정하길 추천

...
    @staticmethod
    def _convert_types(schema, col_type_dict, row):
        converted_row = []
        for col_name, col_val in zip(schema, row):
            if type(col_val) in (datetime, date):
                # col_val = time.mktime(col_val.timetuple()) << 주석
                # 변경
                col_val = col_val.strftime('%Y-%m-%d %H:%M:%S')
            elif isinstance(col_val, Decimal):
                col_val = float(col_val)
            elif col_type_dict.get(col_name) == "BYTES":
                col_val = base64.standard_b64encode(col_val)
                if PY3:
                    col_val = col_val.decode('ascii')
            else:
                col_val = col_val
            converted_row.append(col_val)
        return converted_row
...

 

 

 

수정한 파일을 여기 첨부파일로 넣겠다(글자수가 많아 에디터에서 지원을 안함)

MysqlToGCS.py
0.01MB

이제 다시 실행하여 GCS에서 보면 다음과 같이 저장되어 있음을 확인할 수 있다.

 

테스트 실행

 

airflow test mysql_to_gcs_dag mysql_to_gcs 2020-08-24

결과

{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": "2020-08-25 06:20:38"}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": "2020-08-25 06:25:38"}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": "2020-08-25 06:30:38"}

 

다음 포스팅에선 이 파일을 BigQuery 에 저장하는 작업을 포스팅 하겠다.

 

 

끝.

 

 

 

참조:

https://chang12.github.io/airflow-postgres-to-s3/

 

Airflow 에 PostgresToS3Operator 만들기

ETL 작업을 어떻게 관리하는게 좋을지 고민입니다. 검색했을 때나 주변 분들 얘기를 들어봤을 때 Apache Airflow 가 많이 쓰이는 것 같습니다. 매니지드 서비스로는 GCP 에서는 Cloud Composer, AWS 에서는 A

chang12.github.io

 

airflow github

https://github.com/apache/airflow/blob/1e79dae06e/airflow/contrib/operators/mysql_to_gcs.py

 

apache/airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - apache/airflow

github.com

 

반응형

댓글