지난 글에 이어 살펴보자.
이전 글에서 환경변수에 AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT 를 설정하여 GCS를 접근하게 했는데, 이 정보를 그대로 사용해도 되고 또는 BigQuery만의 설정을 따로 만들어도 좋다. 다만 airflow에서 제공하는 플러그인들은 별도의 bigquery_default 를 conn_id의 기본값으로 하는 경우가 많기 때문에 나역시 환경변수를 추가했다.
export AIRFLOW_CONN_BIGQUERY_DEFAULT='google-cloud-platform://?extra__google_cloud_platform__key_path=[키경로]&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=[프로젝트 ID]' |
지난번 글에 설정한 AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT 와는 다른점이 있는데 바로 extra__google_cloud_platform__num_retries 를 뺐다. 신기하게도 이 옵션을 넣으면 다음과 같은 에러가 발생한다.
ERROR - can only concatenate str (not "int") to str
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute
encryption_configuration=self.encryption_configuration)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1297, in run_load
return self.run_with_configuration(configuration)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
.execute(num_retries=self.num_retries)
File "/usr/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 901, in execute
headers=self.headers,
File "/usr/local/lib/python3.7/site-packages/googleapiclient/http.py", line 159, in _retry_request
for retry_num in range(num_retries + 1):
TypeError: can only concatenate str (not "int") to str
아마 값이 5로 되어있는게 문제인거 같은데, bigquery를 접근하는데 별 문제가 안되기에 빼도 무방하다.
그럼 이제 설정을 추가해보자.
default_args 는 지난 포스팅에 설정방법을 설명했다.
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
...
dag = DAG('mysql_to_gcs_dag', default_args=default_args)
...
# GCS => BigQuery
gcsToBigQuery = GoogleCloudStorageToBigQueryOperator(
task_id = 'gcs_to_bq'
, destination_project_dataset_table='[데이터셋.테이블명]'
, bucket=bucket
, source_objects=[filename]
, write_disposition='WRITE_TRUNCATE'
, create_disposition='CREATE_IF_NEEDED'
, source_format='NEWLINE_DELIMITED_JSON'
, dag=dag
)
안에 내용을 보면 bigquery_conn_id 의 기본값을 bigquery_default로 되어 있다. 여기서는 이미 환경설정을 완료했으니 별 문제 없다. 아래 파일은 GoogleCloudStorageToBigQueryOperator의 내용이다.
class GoogleCloudStorageToBigQueryOperator(BaseOperator):
...
@apply_defaults
def __init__(self,
...
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
...
bigquery에서 update는 가능하긴 하지만 일반 RDB에서 하는 방식은 아니다. 덮어씌운다고 표현해야 하나. 테이블을 생성할 때도 Primary Key를 잡지 않고 하다보니 임의로 찾아서 덮어씌우는게 아니라면 똑같은 값이 몇번이고 들어갈 수 있다. 그건 위의 설정에서도 잘 드러나는데 write_disposition 옵션이 그렇다.
여기에는 총 3가지 옵션이 있다.
- WRITE_EMPTY: 테이블이 비어있어야지만 저장함
- WRITE_TRUNCATE: 테이블이 내용을 없애고 새로 저장함
- WRITE_APPEND: 마지막부터 붙여넣음
이 옵션에 대한 설명은 아래 링크가 있으니 참조하자.
https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
그레서 데이터를 1개의 테이블에 쭉 쌓던지, 아니면 테이블 이름을 구분하여 저장할 것인지를 판단해야 한다.
1개 테이블을 이용할 경우 파티셔닝을, 아니면 [테이블_날짜] 형태로 생성이 가능한데 테이블을 여러개로 쪼개서 써도 와일드카드를 이용해 한번에 조회할 수 있다.
예를들어 temp.data_20200901, temp.data_20200902, temp.data_20200903 이라고 한다면 이것들을 묶어 한번에 조회할 땐 다음의 명령어로 가능하다.
select * from `temp.data_202009*`
파티셔닝 되어있지 않은 테이블 하나에 데이터가 너무 크게 있으면 where절을 이용한 필터를 할 때도 매번 큰 파일을 읽어 비용을 발생시킨다. 파티셔닝이 되어있다면 해당구간만 로딩하기 때문에 비용부담이 적다
참고로 빅쿼리는 처리하는 양에 따라 금액이 정해진다.
와일드 카드 테이블 사용방법은 아래 공식문서를 참조하자
https://cloud.google.com/bigquery/docs/querying-wildcard-tables?hl=ko
create_disposition 옵션은 2가지 옵션만 있는데 아래와 같다
- CREATE_NEVER : 테이블을 생성하지 않음
- CREATE_IF_NEEDED: 필요하면 테이블을 생성함
상황에 따라 설정해 쓰면 되겠다.
참조: https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/CreateDisposition
마지막으로 source_format의 옵션이 NEWLINE_DELIMITED_JSON 으로 되어있는데, gcs의 파일을 보면 다음과 같이 되어있기 때문이다.
{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": "2020-08-25 06:20:38"}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": "2020-08-25 06:25:38"}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": "2020-08-25 06:30:38"}
MySql 에서 가져온 데이터인데, json이 row별로 끊겨있는것을 확인할 수 있다. 그래서 NEWLINE_DELIMITED_JSON 를 통해 1줄이 한개의 row임을 지정하는 것이다.
만약 이 설정이 빠졌다면 다음과 같은 에러가 발생한다.
'errorResult': {'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'}, 'errors': [{'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'}, {'reason': 'invalid', 'location': 'gs://[지정한 GCS 경로]', 'message': 'Error while reading data, error message: Error detected while parsing row starting at position: 0. Error: Data between close double quote (") and field separator.'}], 'state': 'DONE'} |
source_format 의 기본값이 CSV로 되어있기 때문이다. GoogleCloudStorageToBigQueryOperator 안을 보면 다음과 같이 설정되어 있다.
class GoogleCloudStorageToBigQueryOperator(BaseOperator):
...
@apply_defaults
def __init__(self,
...
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
source_format='CSV',
...
그럼 이제 테스트를 해보자.
GCS에 있는 파일은 아래의 내용을 담고 있는 member.json 의 파일이다.
{"AGE": 25, "ID": 1, "NAME": "아이유", "REG_DT": "2020-08-25 06:20:38"}
{"AGE": 33, "ID": 2, "NAME": "김수현", "REG_DT": "2020-08-25 06:25:38"}
{"AGE": 18, "ID": 3, "NAME": "이수영", "REG_DT": "2020-08-25 06:30:38"}
실행 전 다음의 환경설정을 추가했는지 확인하자
- GOOGLE_APPLICATION_CREDENTIALS
- AIRFLOW_CONN_BIGQUERY_DEFAULT
그럼 테스트를 실행한다.
여기서는 temp 데이터셋의 temp_table 에 저장하도록 설정했다.
(위 설정 중 destination_project_dataset_table='temp.temp_table'로 설정)
airflow test mysql_to_gcs_dag gcs_to_bq 2020-09-07
수행하면 다음과 같이 로그가 나오며 성공한다.
{__init__.py:50} INFO - Using executor SequentialExecutor
{dagbag.py:417} INFO - Filling up the DagBag from /Users/dgpark/airflow/dags
{taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs_dag.gcs_to_bq 2020-09-07T00:00:00+09:00 [None]>
{taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs_dag.gcs_to_bq 2020-09-07T00:00:00+09:00 [None]>
{taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
{taskinstance.py:881} INFO - Starting attempt 1 of 2
{taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
{taskinstance.py:901} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): gcs_to_bq> on 2020-09-07T00:00:00+09:00
{crypto.py:77} WARNING - cryptography not found - values will not be stored encrypted.
{bigquery_hook.py:2238} INFO - Project not included in destination_project_dataset_table: temp.temp_table; using project "[프로젝트 ID]"
{bigquery_hook.py:1348} INFO - Waiting for job to complete : [프로젝트 ID], job_lbQZeumGH3g28yO1YYsFEVDe6T4h
{taskinstance.py:1057} INFO - Marking task as SUCCESS.dag_id=mysql_to_gcs_dag, task_id=gcs_to_bq, execution_date=20200907T000000, start_date=20200907T104100, end_date=20200907T104107
Bigquery 콘솔로 접속해서 테이블을 확인하면 생성된 것이 확인된다.
마지막으로 쿼리를 실행하여 결과를 확인해보자.
끝.
'공부 > 프로그래밍' 카테고리의 다른 글
[react] checkbox 전체선택하게 하기(장바구니 상품삭제 기능) (0) | 2020.10.07 |
---|---|
[springboot] jpa 2개이상 DB사용(querydsl 설정 포함) (1) | 2020.09.17 |
[airflow] Mysql의 캐릭터셋(charset)이 euc-kr 일 때(혹은 로드시 can't decode byte 0xb5에러가 날때) (0) | 2020.09.07 |
[python3] 기존에 되던것이 No module named 뜰 때 (0) | 2020.09.03 |
[springboot, bigquery] 데이터 조회 (0) | 2020.09.02 |
댓글