본문 바로가기
공부/프로그래밍

[airflow] GCS(Google Cloud Storage) 파일을 BigQuery에 저장하기

by demonic_ 2020. 9. 9.
반응형

지난 글에 이어 살펴보자.

 

lemontia.tistory.com/957

 

[airflow] Mysql 데이터를 GCS(Google Cloud Storage)로 저장하기(mysql_to_gcs, 한글깨짐, 날짜포멧 수정)

Data lake 를 구축하기 위한 1단계인 원본데이터를 GCS로 이동하는 것을 다뤄보려 한다. embulk를 쓸까도 했었는데, JAVA 1.8 버전 이후부터는 지원을 안하기도 했고, 관리포인트를 늘리는 것도 좋아보��

lemontia.tistory.com

 

이전 글에서 환경변수에 AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT 를 설정하여 GCS를 접근하게 했는데, 이 정보를 그대로 사용해도 되고 또는 BigQuery만의 설정을 따로 만들어도 좋다. 다만 airflow에서 제공하는 플러그인들은 별도의 bigquery_default 를 conn_id의 기본값으로 하는 경우가 많기 때문에 나역시 환경변수를 추가했다.

export AIRFLOW_CONN_BIGQUERY_DEFAULT='google-cloud-platform://?extra__google_cloud_platform__key_path=[키경로]&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=[프로젝트 ID]'

 

지난번 글에 설정한 AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT 와는 다른점이 있는데 바로 extra__google_cloud_platform__num_retries 를 뺐다. 신기하게도 이 옵션을 넣으면 다음과 같은 에러가 발생한다.

ERROR - can only concatenate str (not "int") to str
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
    encryption_configuration=self.encryption_configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1297, in run_load
    return self.run_with_configuration(configuration)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
    .execute(num_retries=self.num_retries)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 901, in execute
    headers=self.headers,
  File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 159, in _retry_request
    for retry_num in range(num_retries + 1):
TypeError: can only concatenate str (not "int") to str

 

아마 값이 5로 되어있는게 문제인거 같은데, bigquery를 접근하는데 별 문제가 안되기에 빼도 무방하다.

 

그럼 이제 설정을 추가해보자.

default_args 는 지난 포스팅에 설정방법을 설명했다.

from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

...
dag = DAG('mysql_to_gcs_dag', default_args=default_args)
...
# GCS => BigQuery
gcsToBigQuery = GoogleCloudStorageToBigQueryOperator(
		task_id = 'gcs_to_bq'
		, destination_project_dataset_table='[데이터셋.테이블명]'
		, bucket=bucket
		, source_objects=[filename]
		, write_disposition='WRITE_TRUNCATE'
		, create_disposition='CREATE_IF_NEEDED'
		, source_format='NEWLINE_DELIMITED_JSON'
		, dag=dag
	)

 

안에 내용을 보면 bigquery_conn_id 의 기본값을 bigquery_default로 되어 있다. 여기서는 이미 환경설정을 완료했으니 별 문제 없다. 아래 파일은 GoogleCloudStorageToBigQueryOperator의 내용이다.

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
...
    @apply_defaults
    def __init__(self,
...
                 bigquery_conn_id='bigquery_default',
                 google_cloud_storage_conn_id='google_cloud_default',
...

 

bigquery에서 update는 가능하긴 하지만 일반 RDB에서 하는 방식은 아니다. 덮어씌운다고 표현해야 하나. 테이블을 생성할 때도 Primary Key를 잡지 않고 하다보니 임의로 찾아서 덮어씌우는게 아니라면 똑같은 값이 몇번이고 들어갈 수 있다. 그건 위의 설정에서도 잘 드러나는데 write_disposition 옵션이 그렇다.

 

여기에는 총 3가지 옵션이 있다.

- WRITE_EMPTY: 테이블이 비어있어야지만 저장함

- WRITE_TRUNCATE: 테이블이 내용을 없애고 새로 저장함

- WRITE_APPEND: 마지막부터 붙여넣음

 

이 옵션에 대한 설명은 아래 링크가 있으니 참조하자.

https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition

 

WriteDisposition  |  BigQuery  |  Google Cloud

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trade

cloud.google.com

 

 

그레서 데이터를 1개의 테이블에 쭉 쌓던지, 아니면 테이블 이름을 구분하여 저장할 것인지를 판단해야 한다.

1개 테이블을 이용할 경우 파티셔닝을, 아니면 [테이블_날짜] 형태로 생성이 가능한데 테이블을 여러개로 쪼개서 써도 와일드카드를 이용해 한번에 조회할 수 있다.

예를들어 temp.data_20200901, temp.data_20200902, temp.data_20200903 이라고 한다면 이것들을 묶어 한번에 조회할 땐 다음의 명령어로 가능하다.

 

select * from `temp.data_202009*`

 

파티셔닝 되어있지 않은 테이블 하나에 데이터가 너무 크게 있으면 where절을 이용한 필터를 할 때도 매번 큰 파일을 읽어 비용을 발생시킨다. 파티셔닝이 되어있다면 해당구간만 로딩하기 때문에 비용부담이 적다

참고로 빅쿼리는 처리하는 양에 따라 금액이 정해진다.

 

 

와일드 카드 테이블 사용방법은 아래 공식문서를 참조하자

https://cloud.google.com/bigquery/docs/querying-wildcard-tables?hl=ko

 

와일드 카드 테이블을 사용하여 여러 테이블 쿼리  |  BigQuery  |  Google Cloud

와일드 카드 테이블을 사용하면 간단한 SQL 문으로 여러 테이블을 쿼리할 수 있습니다. 와일드 카드 테이블은 표준 SQL에서만 사용할 수 있습니다. legacy SQL의 해당 기능은 테이블 와일드 카드 함��

cloud.google.com

 

create_disposition 옵션은 2가지 옵션만 있는데 아래와 같다

- CREATE_NEVER : 테이블을 생성하지 않음

- CREATE_IF_NEEDED: 필요하면 테이블을 생성함

상황에 따라 설정해 쓰면 되겠다.

 

참조: https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/CreateDisposition

 

CreateDisposition  |  BigQuery  |  Google Cloud

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trade

cloud.google.com

 

마지막으로 source_format의 옵션이 NEWLINE_DELIMITED_JSON 으로 되어있는데, gcs의 파일을 보면 다음과 같이 되어있기 때문이다.

{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": "2020-08-25 06:20:38"}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": "2020-08-25 06:25:38"}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": "2020-08-25 06:30:38"}

 

MySql 에서 가져온 데이터인데, json이 row별로 끊겨있는것을 확인할 수 있다. 그래서 NEWLINE_DELIMITED_JSON 를 통해 1줄이 한개의 row임을 지정하는 것이다.

만약 이 설정이 빠졌다면 다음과 같은 에러가 발생한다.

'errorResult': {'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'}, 'errors': [{'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'}, {'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: Error detected while parsing row starting at position: 0. Error: Data between close double quote (") and field separator.'}], 'state': 'DONE'}

 

source_format 의 기본값이 CSV로 되어있기 때문이다. GoogleCloudStorageToBigQueryOperator 안을 보면 다음과 같이 설정되어 있다.

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
...
    @apply_defaults
    def __init__(self,
...
                 bigquery_conn_id='bigquery_default',
                 google_cloud_storage_conn_id='google_cloud_default',
                 source_format='CSV',
...

 


그럼 이제 테스트를 해보자.

GCS에 있는 파일은 아래의 내용을 담고 있는 member.json 의 파일이다.

 

{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": "2020-08-25 06:20:38"}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": "2020-08-25 06:25:38"}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": "2020-08-25 06:30:38"}

 

실행 전 다음의 환경설정을 추가했는지 확인하자

- GOOGLE_APPLICATION_CREDENTIALS

- AIRFLOW_CONN_BIGQUERY_DEFAULT

그럼 테스트를 실행한다.

 

여기서는 temp 데이터셋의 temp_table 에 저장하도록 설정했다.

(위 설정 중 destination_project_dataset_table='temp.temp_table'로 설정)

 

airflow test mysql_to_gcs_dag gcs_to_bq 2020-09-07

 

수행하면 다음과 같이 로그가 나오며 성공한다.

{__init__.py:50} INFO - Using executor SequentialExecutor
{dagbag.py:417} INFO - Filling up the DagBag from /Users/dgpark/airflow/dags
{taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs_dag.gcs_to_bq 2020-09-07T00:00:00+09:00 [None]>
{taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs_dag.gcs_to_bq 2020-09-07T00:00:00+09:00 [None]>
{taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
{taskinstance.py:881} INFO - Starting attempt 1 of 2
{taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
{taskinstance.py:901} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): gcs_to_bq> on 2020-09-07T00:00:00+09:00
{crypto.py:77} WARNING - cryptography not found - values will not be stored encrypted.
{bigquery_hook.py:2238} INFO - Project not included in destination_project_dataset_table: temp.temp_table; using project "[프로젝트 ID]"
{bigquery_hook.py:1348} INFO - Waiting for job to complete : [프로젝트 ID], job_lbQZeumGH3g28yO1YYsFEVDe6T4h
{taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=mysql_to_gcs_dag, task_id=gcs_to_bq, execution_date=20200907T000000, start_date=20200907T104100, end_date=20200907T104107

 

Bigquery 콘솔로 접속해서 테이블을 확인하면 생성된 것이 확인된다.

 

마지막으로 쿼리를 실행하여 결과를 확인해보자.

 

 

끝.

반응형

댓글