사족: 발표자 진짜 개패고싶음. 지금까지 봤던 모든 발표 중 단연 최악.
중언부언에 용어 거꾸로 설명하다가 뒤늦게 바로잡는다던가,
뇌정지 온 것처럼 몇 초 가만히 있다가 대충 수습하고 넘어가는 게 한두번이 아님.
발표만 깔끔하게 잘했어도 훨씬 좋은 세션이 되었을 거다.
https://youtu.be/DYC4-xElccE?si=lW5prwBguU_MqwQy
Ash Berlin-Taylor
- PMC member on Airflow
- Director of Airflow Engineering
Scheduler의 HA를 위해 re-architecting 했던 것들 정리.
Responsibility of the Scheduler
단순히 Run Task만 수행하는 게 아님.
- Check Dependencies
- Retry Management
- DST transition
- SLAs
- Success / Fail callback
- Enforce Concurrency Limit
- Emit Metrics
- ...
세 개의 컴포넌트로 이루어져 있음.
- SchedulerJob: 동작의 핵심인 state machine 관리. run task / retry...
- Executor: 실제로 task를 실행하는 컴포넌트
- DagFileProcessor: DAG file을 디스크에서 읽고, serialized DAG table에 저장
"The" Scheduler
airflow.jobs.scheduler_job
핵심 기능은 Manage the state machine
- run task / task instances
Never load DAG code into a long-running Process
- airflow에서는 DAG file을 변경하고 배포하더라도 scheduler / Web server를 재시작할 필요가 없는 구조.
- Short-lived process에서 특히 유효한 장점.
DAG file의 parsing / serialized DAG Representation
- Scheduler는 serialized된 결과물을 기준으로 scheduling을 수행한다. (big json blob with schema from Database)
- 한쪽은 write만 하고, 다른 쪽은 read만 하는 구조
- scheduler의 speed up 핵심 중 하나.
Scheduler의 loop 구조는 위와 같다.
- scheduling 수행
- DAG file processor가 정상인지 heartbeat. 문제가 있을 경우 재시작
- 1 scheduler has 1 DAG file processor
- heartbeat: 정상적으로 동작중인지
- timed_events: 일종의 housekeeping 역할. every 15s 또는 30s 주기로 실행
- detecting zombies
- adopting tasks
- check if DB is out of state / manually delete a DAG run...
create_dagruns_for_dags()
- DAG 테이블에 있는 next_dagrun_create_after 컬럼을 확인. (what is the earliest date that i can create the next DAG run)
- next_dagrun_create_after < NOW() 인 경우,
- 신규 DAG Run을 생성하고
- DAG 테이블의 next_dagrun에 serialized 정보 저장, next_dagrun_create_after 컬럼 업데이트
- 생성된 DAGRun 정보는 queue로 저장된다.
start_queued_dagrun()
- max_active_dagrun 체크.
- 현재 실행중인 dagrun 개수가 max보다 낮을 경우, Queued -> Running으로 상태를 변경하는 함수.
- 실제로 task를 실행하는 로직은 이 함수. (queued -> running 변경하는 부분)
get_next_dagruns_to_examine(State.RUNNING)
- Get next n 'oldest' DagRuns in 'running' state.
- oldest = DB 컬럼의 last_scheduling_decision 같은 이름의 필드 참고.
for문에 있는 schedule_dag_run(dag_run)
- schedule dagrun..
- handle callbacks
- checks if DAG structure changes
- 아직은 dynamic DAG를 On-the-fly로 반영하는 로직의 edge case 커버가 부족하지만, 기본적인 dynamic change에는 잘 반영됨
- 없어진 Task, 신규 task 등...
- Compute which taskInstances can now be 'schedueled' (via currently-misnamed DagRun.update_state method)
- Pass pending callbacks to DagFileProcessorManager
critical_section_execute_task_instance()
- scheduler는 treated as running에서 멈춘다. 나머지 작업은 executor가 처리하도록 넘겨줌.
- queued_task 개수만 응답으로 받는다.
schedule_dag_run 과 critical_section_execute_task_instancce 메소드 두 개가 동작의 핵심.
- schedule_dag_run은 task를 queue에 집어넣고
- execute_task_instance는 executor가 queue에서 꺼내 실행하도록 돌려놓는다.
Before Enqueueing a TaskInstance
- task가 실행될 수 있는 open pool slot이 있는지.
- max_active_task per DAG이 limit 초과하지는 않는지
- DAG와 task가 task_concurrency limit을 초과하지는 않는지
- Executor slots available (parallelism)
Executor
- k8s Executor: pod 실행.
- celery Executor: send message to celery broker. worker picks up.
- local Executor: local process pool에 전달, local에서 실행. scheduler 내부에서 실행됨
it predominantly keeps the state in Memory.
- drawback처럼 보일 수 있으나..
- 애초에 scheduler와 executor가 같은 프로세스에 있고, 매번 DB에 조회할 필요가 없다는 점에서 성능면에서도 낫다
task가 실패할 경우, 99%는 report its own status to DB.
- executor의 실패 / task의 실패는 완전히 다른 것.
- task가 fail되었으나 정상적으로 상태를 보고하지 못하는 경우... executor가 monitor -> report status.
DAG Parsing
airflow.dag_processing
- 사용자의 DAG code 로딩하는 곳.
DagFileProcessorManager
- scheduler에서 실행되는 subprocess.
- subprocess pool을 관리하는 infinite loop
- load DAG file
- execute any Pending DAG level callbacks
큰 틀에서는 pass DAG file -> update the state in the Database -> exit
cf. reload hook같은 기능 구현이 안되어 있기 때문에, Long-running process로 실행할 경우 버그나 에러가 날 수 있다고.
- 즉 불완전한 isolation 때문
- check the list of processors (default = 2). 필요할 경우 processor를 추가로 생성한다
- processor는 DAG Parsing 수행
- dag file 파싱하고
- DB 테이블에 저장
- 일정 주기로 sends heartbeat back to scheduler.
- 일정 주기로 refresh dag dir
cf. scheduler 자체는 does not execute the callbacks. (callback 수행하려면 python 함수가 필요. python 함수는 executor로 파싱해야 알 수 있기 때문)
High Availability: Use Existing Metadata DB for Sync
Airflow는 컴포넌트 간 direct communication을 지원하지 않음.
- zookeeper나 leader election 등 정합성을 맞추는 여러 방법이 있지만, 구조가 복잡해진다.
- airflow는 postgres를 주로 쓰는데, DB의 scale 기능을 써도 괜찮을 거라 생각했음.
Row Level Locking 사용.
- 두 개의 scheduler가 동시에 특정 task instance를 조회할 경우... 조회 데이터의 정합성이 깨질 수 있다.
- FOR UPDATE 명령어로 mutex 설정하는 경우... 뒤늦게 조회 시도한 쿼리는 대기 상태로 들어간다. (idle)
- SKIP LOCKED 명령어 사용... lock 걸린 Row는 건너뛰고 나머지를 응답하는 식.
- transaction 단위로 lock. scheduling loop 내에서는 transaction 걸 때 조심해야 한다.
sqlAlchemy의 context manager와 prohibit_commit 사용.
critical_section_execute_task_instance()
python의 concurrent programming 영역. 성능 개선을 위한 영역이라고 보면 됨.
- scheduler가 여러 개 있고, 필요에 따라 task를 실행하는 process를 pool 내에서 생성한다.
- 그러면, concurrently running process를 확인할 수 있는 유일한 방법은 DB에서 확인하는 것뿐.
DB lock 걸고
- 모든 pool에서 running 상태인 task 조회
- python 메모리 내에서 +1
- DB에 저장
50ms / 100ms 단위로 위 쿼리 수행.
- 만약 pool 조회 쿼리를 수행할 수 없는 경우 (lock걸려있다던가), NOWAIT
- scheduler가 해야 할 다른 일들 (Create DagRun, schedule other task...)
scheduler는 어느 순간 fail할 수 있다.
Active-Active 구조를 위한 기능 중 하나: task adoption.
- 주기적으로 scheduler 상태를 체크한다. heartbeat 응답하지 않은 것들 확인.
- dead executor에 있던 task는 다른 곳으로 adopt된다.
그 외에도 detecting zombie task 탐지해서 kill / restart 한다던가, SLAs 관리.
mini scheduler 역할을 하는 최적화 기능도 개발 중. 불필요한 loop 일부를 해소할 수 있다고 함.