본문 바로가기
공부/데이터

[airflow] 테이블 유무 확인 BigQueryTableSensor

by demonic_ 2020. 9. 10.
반응형

간단하게 알아보자.

 

airflow에는 bigquery를 수행할때 BigQueryHook을 이용하는데 여기서 제공하는 메서드 중에 hook.table_exists 라는게 있다.

해당 코드를 보면 다음과 같다

...
    def table_exists(self, project_id, dataset_id, table_id):
        """
        Checks for the existence of a table in Google BigQuery.

        :param project_id: The Google cloud project in which to look for the
            table. The connection supplied to the hook must provide access to
            the specified project.
        :type project_id: str
        :param dataset_id: The name of the dataset in which to look for the
            table.
        :type dataset_id: str
        :param table_id: The name of the table to check the existence of.
        :type table_id: str
        """
        service = self.get_service()
        try:
            service.tables().get(
                projectId=project_id, datasetId=dataset_id,
                tableId=table_id).execute(num_retries=self.num_retries)
            return True
        except HttpError as e:
            if e.resp['status'] == '404':
                return False
            raise
...

 

 

project_id, dataset_id, table_id 를 조합해 테이블이 있는지 확인한다.

 

이것을 쉽게 호출 및 추가기능을 가진것이 BigQueryTableSensor 이다.

 

옵션을 살펴보자.

class BigQueryTableSensor(BaseSensorOperator):
...
    @apply_defaults
    def __init__(self,
                 project_id,
                 dataset_id,
                 table_id,
                 bigquery_conn_id='bigquery_default_conn',
                 delegate_to=None,
                 *args, **kwargs):

        super(BigQueryTableSensor, self).__init__(*args, **kwargs)
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to

    def poke(self, context):
        table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
        self.log.info('Sensor checks existence of table: %s', table_uri)
        hook = BigQueryHook(
            bigquery_conn_id=self.bigquery_conn_id,
            delegate_to=self.delegate_to)
        return hook.table_exists(self.project_id, self.dataset_id, self.table_id)

 

 

내용을 보면 project_id, dataset_id, table_id 를 각각 받도록 되어있다.

하나 더 유심히 봐야할 것은 bigquery_id의 기본값이 bigquery_default_conn라는 점이다.

이전까지 bigquery관련 operator를 이용한 사람은 이 값이 대부분 bigquery_default 로 구성된 것을 봤을 것이다. 예를들어 쿼리를 조회하는 BigQueryOperator를 보면 다음처럼 되어있다.

...
    @apply_defaults
    def __init__(self,
                 bql=None,
                 sql=None,
                 destination_dataset_table=None,
                 write_disposition='WRITE_EMPTY',
                 allow_large_results=False,
                 flatten_results=None,
                 bigquery_conn_id='bigquery_default',
                 ...
...

 

 

그래서 bigquery_conn_id 도 직접 지정해주는 것이 좋다.

관련내용은 이전 포스팅을 보면 좋을듯 하다.

lemontia.tistory.com/957

 

[airflow] Mysql 데이터를 GCS(Google Cloud Storage)로 저장하기(mysql_to_gcs, 한글깨짐, 날짜포멧 수정)

Data lake 를 구축하기 위한 1단계인 원본데이터를 GCS로 이동하는 것을 다뤄보려 한다. embulk를 쓸까도 했었는데, JAVA 1.8 버전 이후부터는 지원을 안하기도 했고, 관리포인트를 늘리는 것도 좋아보��

lemontia.tistory.com

 

DAG를 다음과 같이 설정한다.

...
checker = BigQueryTableSensor(
    dag=dag
    , task_id='bq_check_exist_table'
    , bigquery_conn_id='bigquery_default'
    , project_id='[PROJECT ID]'
    , dataset_id='[DATASET ID]'
    , table_id='[테이블 ID]'
)
...

 

이렇게만 두면 잘 수행된다. 문제는 테이블이 없을때 재점검을 생각보다 오래한다는 점이다.

기본값을 살펴보자.

BigQueryTableSensor클래스를 보면 BaseSensorOperator를 상속받는다. timeout을 설정하는 곳은 여기에 있다.

class BaseSensorOperator(BaseOperator, SkipMixin):
    ...
    @apply_defaults
    def __init__(self,
                 poke_interval=60,
                 timeout=60 * 60 * 24 * 7,
                 soft_fail=False,
                 mode='poke',
                 *args,
                 **kwargs):
        super(BaseSensorOperator, self).__init__(*args, **kwargs)
        self.poke_interval = poke_interval
        self.soft_fail = soft_fail
        self.timeout = timeout
        self.mode = mode
        self._validate_input_values()

 

보면 poke_interval과 timeout만 조절하면 시간조절이 가능하다.

 

poke_interval과 timeout의 설명을 보면 다음과 같다.

...
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
...

 

poke_interval 은 한번 요청할때 기다리는 시간을 의미한다.

그리고 timeout 은 전체 요청시간을 체크한다. 즉 fail 여부를 판단하는 곳이 바로 여기다.

 

실제로 프로세스를 수행하는 코드를 보면 다음과 같이 되어있다.

...
    def execute(self, context):
        started_at = timezone.utcnow()
        if self.reschedule:
            # If reschedule, use first start date of current try
            task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
            if task_reschedules:
                started_at = task_reschedules[0].start_date
        while not self.poke(context):
            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                # If sensor is in soft fail mode but will be retried then
                # give it a chance and fail with timeout.
                # This gives the ability to set up non-blocking AND soft-fail sensors.
                if self.soft_fail and not context['ti'].is_eligible_to_retry():
                    self._do_skip_downstream_tasks(context)
                    raise AirflowSkipException('Snap. Time is OUT.')
                else:
                    raise AirflowSensorTimeout('Snap. Time is OUT.')
            if self.reschedule:
                reschedule_date = timezone.utcnow() + timedelta(
                    seconds=self.poke_interval)
                raise AirflowRescheduleException(reschedule_date)
            else:
                sleep(self.poke_interval)
        self.log.info("Success criteria met. Exiting.")
...

 

그래서 내 경우는 다음과 같이 설정하여 시간을 단축시켰다.

...
checker = BigQueryTableSensor(
    dag=dag
    , task_id='bq_check_exist_table'
    , bigquery_conn_id='bigquery_default'
    , project_id='[PROJECT ID]'
    , dataset_id='[DATASET ID]'
    , table_id='[테이블 ID]'
    , poke_interval=3
    , timeout=8
)
...

 

계산대로라면 총 3번 시도를 할 것이고 3번째가 종료되기 전에 전체 Fail로 종료될 것이다.

--------------------------------------------------------------------------------
{taskinstance.py:900} INFO - Executing <Task(BigQueryTableSensor): bq_check_exist_table> on 2020-09-01T00:00:00+09:00
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{bigquery_sensor.py:66} INFO - Sensor checks existence of table: [project id]:[dateset id]
{taskinstance.py:1207} ERROR - Snap. Time is OUT.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 116, in execute
    raise AirflowSensorTimeout('Snap. Time is OUT.')
airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
[2020-09-01 12:26:47,616] {taskinstance.py:1187} INFO - All

 

실제로 3번 호출하였고 이후 에러가 발생했다. 에러문구는 Time is OUT.

 

 

끝.

 

반응형

댓글