간단하게 알아보자.
airflow에는 bigquery를 수행할때 BigQueryHook을 이용하는데 여기서 제공하는 메서드 중에 hook.table_exists 라는게 있다.
해당 코드를 보면 다음과 같다
...
def table_exists(self, project_id, dataset_id, table_id):
"""
Checks for the existence of a table in Google BigQuery.
:param project_id: The Google cloud project in which to look for the
table. The connection supplied to the hook must provide access to
the specified project.
:type project_id: str
:param dataset_id: The name of the dataset in which to look for the
table.
:type dataset_id: str
:param table_id: The name of the table to check the existence of.
:type table_id: str
"""
service = self.get_service()
try:
service.tables().get(
projectId=project_id, datasetId=dataset_id,
tableId=table_id).execute(num_retries=self.num_retries)
return True
except HttpError as e:
if e.resp['status'] == '404':
return False
raise
...
project_id, dataset_id, table_id 를 조합해 테이블이 있는지 확인한다.
이것을 쉽게 호출 및 추가기능을 가진것이 BigQueryTableSensor 이다.
옵션을 살펴보자.
class BigQueryTableSensor(BaseSensorOperator):
...
@apply_defaults
def __init__(self,
project_id,
dataset_id,
table_id,
bigquery_conn_id='bigquery_default_conn',
delegate_to=None,
*args, **kwargs):
super(BigQueryTableSensor, self).__init__(*args, **kwargs)
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def poke(self, context):
table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
self.log.info('Sensor checks existence of table: %s', table_uri)
hook = BigQueryHook(
bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
return hook.table_exists(self.project_id, self.dataset_id, self.table_id)
내용을 보면 project_id, dataset_id, table_id 를 각각 받도록 되어있다.
하나 더 유심히 봐야할 것은 bigquery_id의 기본값이 bigquery_default_conn라는 점이다.
이전까지 bigquery관련 operator를 이용한 사람은 이 값이 대부분 bigquery_default 로 구성된 것을 봤을 것이다. 예를들어 쿼리를 조회하는 BigQueryOperator를 보면 다음처럼 되어있다.
...
@apply_defaults
def __init__(self,
bql=None,
sql=None,
destination_dataset_table=None,
write_disposition='WRITE_EMPTY',
allow_large_results=False,
flatten_results=None,
bigquery_conn_id='bigquery_default',
...
...
그래서 bigquery_conn_id 도 직접 지정해주는 것이 좋다.
관련내용은 이전 포스팅을 보면 좋을듯 하다.
DAG를 다음과 같이 설정한다.
...
checker = BigQueryTableSensor(
dag=dag
, task_id='bq_check_exist_table'
, bigquery_conn_id='bigquery_default'
, project_id='[PROJECT ID]'
, dataset_id='[DATASET ID]'
, table_id='[테이블 ID]'
)
...
이렇게만 두면 잘 수행된다. 문제는 테이블이 없을때 재점검을 생각보다 오래한다는 점이다.
기본값을 살펴보자.
BigQueryTableSensor클래스를 보면 BaseSensorOperator를 상속받는다. timeout을 설정하는 곳은 여기에 있다.
class BaseSensorOperator(BaseOperator, SkipMixin):
...
@apply_defaults
def __init__(self,
poke_interval=60,
timeout=60 * 60 * 24 * 7,
soft_fail=False,
mode='poke',
*args,
**kwargs):
super(BaseSensorOperator, self).__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = soft_fail
self.timeout = timeout
self.mode = mode
self._validate_input_values()
보면 poke_interval과 timeout만 조절하면 시간조절이 가능하다.
poke_interval과 timeout의 설명을 보면 다음과 같다.
...
:param poke_interval: Time in seconds that the job should wait in
between each tries
:type poke_interval: int
:param timeout: Time, in seconds before the task times out and fails.
:type timeout: int
...
poke_interval 은 한번 요청할때 기다리는 시간을 의미한다.
그리고 timeout 은 전체 요청시간을 체크한다. 즉 fail 여부를 판단하는 곳이 바로 여기다.
실제로 프로세스를 수행하는 코드를 보면 다음과 같이 되어있다.
...
def execute(self, context):
started_at = timezone.utcnow()
if self.reschedule:
# If reschedule, use first start date of current try
task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
if task_reschedules:
started_at = task_reschedules[0].start_date
while not self.poke(context):
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
# give it a chance and fail with timeout.
# This gives the ability to set up non-blocking AND soft-fail sensors.
if self.soft_fail and not context['ti'].is_eligible_to_retry():
self._do_skip_downstream_tasks(context)
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
if self.reschedule:
reschedule_date = timezone.utcnow() + timedelta(
seconds=self.poke_interval)
raise AirflowRescheduleException(reschedule_date)
else:
sleep(self.poke_interval)
self.log.info("Success criteria met. Exiting.")
...
그래서 내 경우는 다음과 같이 설정하여 시간을 단축시켰다.
...
checker = BigQueryTableSensor(
dag=dag
, task_id='bq_check_exist_table'
, bigquery_conn_id='bigquery_default'
, project_id='[PROJECT ID]'
, dataset_id='[DATASET ID]'
, table_id='[테이블 ID]'
, poke_interval=3
, timeout=8
)
...
계산대로라면 총 3번 시도를 할 것이고 3번째가 종료되기 전에 전체 Fail로 종료될 것이다.
--------------------------------------------------------------------------------
{taskinstance.py:900} INFO - Executing <Task(BigQueryTableSensor): bq_check_exist_table> on 2020-09-01T00:00:00+09:00
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{taskinstance.py:1207} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
raise AirflowSensorTimeout('Snap. Time is OUT.')
airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
[2020-09-01 12:26:47,616] {taskinstance.py:1187} INFO - All
실제로 3번 호출하였고 이후 에러가 발생했다. 에러문구는 Time is OUT.
끝.
'공부 > 데이터' 카테고리의 다른 글
[airflow] Mysql to gcs to bigquery 할 때 CSV로 저장하기 (0) | 2020.10.08 |
---|---|
[airflow] DAG schedule_interval에 timezone 세팅 (0) | 2020.08.31 |
[pandas] 특정 key를 기준으로 groupby 한 후 해당목록 배열(dict)로 변경하기 (0) | 2020.05.30 |
[pandas] read_csv 사용 시 날짜를 datetime 형태로 지정하기 (0) | 2019.12.10 |
[데이터분석] 장바구니 분석(apriori 알고리즘) 사용 및 해석하기 (2) | 2019.12.04 |
댓글