airflow 버전은 1.10.10 을 사용중이다. (현재 기준 최신버전은 1.10.12)
유지보수를 용이하게 하기 위해 execution_date에 어떻게든 timezone에 맞춰 설정하려 했는데 몇가지 방법을 써도 전혀 되지 않았다. 알고보니 스케쥴을 실행할 때 run_id라는게 있는데, 이게 execution_date 과 달리 움직이기 때문이었다.(내부 문제인듯 싶은데....)
그래서 결론만 말하자면 UTF 기준으로 설정해야 하고, timezone은 설정되지 않는다.(화면에서 보이는 것 등은 설정이 되는거 같더만...)
아래는 과정이다. 그럼 시작.
테스트를 위해 schedule_interval 설정을 새벽 1시로 설정했다. 그래야 timezone 설정 테스트에 부합하기 때문이다. 예를들어 9월 1일 1시로 스케줄설정 했을 때 timezone설정이 적용되지 않았다면 8월 31일로 실행될 것이다.
dag = DAG(dagName, default_args=default_args, schedule_interval="0 1 * * *")
DAG가 실행되면 데이터베이스 중 JOB 테이블에 저장된다.
그래서 검색하면 다음과 같은 데이터 결과를 볼 수 있다.
select *
from dag_run
order by id desc;
결과
살펴보면 execution_date, state, run_id, start_date, end_date 등 어떤 파라미터를 가지고 실행했는지, 그리고 결과를 알 수 있다. 그럼 이제 timezone 을 설정해서 살펴보자.
# pendulum 를 이용한 timezone 설정
다음 옵션을 이용해 timezone을 설정했다.
from airflow.utils import timezone
import pendulum
local_tz = pendulum.timezone("Asia/Seoul")
default_args = {
...
start_date: datetime(2020, 8, 29, 1, 0, 0, tzinfo=local_tz)
...
}
위와같이 설정한 후 DAG를 수행해 보았다. 결과는 다음과 같다
데이터를 보면 execution_date는 제대로 들어가는 것이 확인됐다. 그런데 run_id는 9시간 전으로 들어가 있다. timezone 설정이 적용되지 않은 것이다.
즉 execution_date를 아무리 잘 설정해도 run_id가 다르게 들어가버리니 의미가 없다.
또한 실행하면서 받을 수 있는 변수들 (예를들어 ds, dS_nodash 등) 또한 run_id 기준으로 실행된다. 다음을 통해 테스트를 진행해보았다.
{{ts}} 설명은 다음과 같다(관련 사이트 주소는 아래 참조에 추가)
{{ts}} => same as execution_date.isoformat(). Example: 2018-01-01T00:00:00+00:00 |
...
def testParam():
now_ts = "{{ts}}"
return now_ts
test1 = BashOperator(
dag = dag
, bash_command='echo 1 testParam: ' + testParam() + ' && date'
, task_id='test1'
)
...
어제날짜를 변수로 설정해보았다. 실행결과 다음과 같은 로그를 확인할 수 있다.
INFO - Task exited with return code 0 |
date로 찍은 날짜는 timezone 이 적용되었지만 Macros 로 받은 래퍼런스는 run_id를 따라가는 듯 하다.(-9시 적용)
그래서 timezone 적용 없이 utc 시간 기준으로 설정하는게 좋을듯 하다.
# airflow.cfg 파일을 이용한 timezone 설정
해당 파일을 열어 timezone을 설정했다.
...
# 이부분 설정
# default_timezone =utf
default_timezone = Asia/Seoul
...
# 혹시몰라 아래도 수정
default_ui_timezone = Asia/Seoul
...
수행해본 결과 위와 동일한 결과가 나왔다.
끝.
참조:
https://airflow.apache.org/docs/stable/macros-ref.html
'공부 > 데이터' 카테고리의 다른 글
[airflow] Mysql to gcs to bigquery 할 때 CSV로 저장하기 (0) | 2020.10.08 |
---|---|
[airflow] 테이블 유무 확인 BigQueryTableSensor (0) | 2020.09.10 |
[pandas] 특정 key를 기준으로 groupby 한 후 해당목록 배열(dict)로 변경하기 (0) | 2020.05.30 |
[pandas] read_csv 사용 시 날짜를 datetime 형태로 지정하기 (0) | 2019.12.10 |
[데이터분석] 장바구니 분석(apriori 알고리즘) 사용 및 해석하기 (2) | 2019.12.04 |
댓글