공부하고 기록하는, 경제학과 출신 개발자의 노트

학습일지/architecture

Airflow Summit 2021 - Deep Dive into the airflow scheduler

inspirit941 2024. 12. 18. 10:22
반응형

사족: 발표자 진짜 개패고싶음. 지금까지 봤던 모든 발표 중 단연 최악.

중언부언에 용어 거꾸로 설명하다가 뒤늦게 바로잡는다던가,
뇌정지 온 것처럼 몇 초 가만히 있다가 대충 수습하고 넘어가는 게 한두번이 아님.

발표만 깔끔하게 잘했어도 훨씬 좋은 세션이 되었을 거다.

 

 

https://youtu.be/DYC4-xElccE?si=lW5prwBguU_MqwQy

 

 

 

Ash Berlin-Taylor

  • PMC member on Airflow
  • Director of Airflow Engineering

Scheduler의 HA를 위해 re-architecting 했던 것들 정리.

Responsibility of the Scheduler

스크린샷 2024-12-16 오후 12 08 16

 

단순히 Run Task만 수행하는 게 아님.

  • Check Dependencies
  • Retry Management
  • DST transition
  • SLAs
  • Success / Fail callback
  • Enforce Concurrency Limit
  • Emit Metrics
  • ...

스크린샷 2024-12-16 오후 12 27 43

 

세 개의 컴포넌트로 이루어져 있음.

  • SchedulerJob: 동작의 핵심인 state machine 관리. run task / retry...
  • Executor: 실제로 task를 실행하는 컴포넌트
  • DagFileProcessor: DAG file을 디스크에서 읽고, serialized DAG table에 저장

"The" Scheduler

airflow.jobs.scheduler_job

스크린샷 2024-12-16 오후 12 45 53

 

핵심 기능은 Manage the state machine

  • run task / task instances

스크린샷 2024-12-16 오후 12 49 48

 

Never load DAG code into a long-running Process

  • airflow에서는 DAG file을 변경하고 배포하더라도 scheduler / Web server를 재시작할 필요가 없는 구조.
  • Short-lived process에서 특히 유효한 장점.

스크린샷 2024-12-16 오후 12 53 22

 

DAG file의 parsing / serialized DAG Representation

  • Scheduler는 serialized된 결과물을 기준으로 scheduling을 수행한다. (big json blob with schema from Database)
  • 한쪽은 write만 하고, 다른 쪽은 read만 하는 구조
    • scheduler의 speed up 핵심 중 하나.

스크린샷 2024-12-16 오후 12 58 08

 

Scheduler의 loop 구조는 위와 같다.

  • scheduling 수행
  • DAG file processor가 정상인지 heartbeat. 문제가 있을 경우 재시작
    • 1 scheduler has 1 DAG file processor
  • heartbeat: 정상적으로 동작중인지
  • timed_events: 일종의 housekeeping 역할. every 15s 또는 30s 주기로 실행
    • detecting zombies
    • adopting tasks
    • check if DB is out of state / manually delete a DAG run...

스크린샷 2024-12-16 오후 12 58 58

 

create_dagruns_for_dags()

  • DAG 테이블에 있는 next_dagrun_create_after 컬럼을 확인. (what is the earliest date that i can create the next DAG run)
  • next_dagrun_create_after < NOW() 인 경우,
    • 신규 DAG Run을 생성하고
    • DAG 테이블의 next_dagrun에 serialized 정보 저장, next_dagrun_create_after 컬럼 업데이트
  • 생성된 DAGRun 정보는 queue로 저장된다.

start_queued_dagrun()

  • max_active_dagrun 체크.
  • 현재 실행중인 dagrun 개수가 max보다 낮을 경우, Queued -> Running으로 상태를 변경하는 함수.
    • 실제로 task를 실행하는 로직은 이 함수. (queued -> running 변경하는 부분)

get_next_dagruns_to_examine(State.RUNNING)

  • Get next n 'oldest' DagRuns in 'running' state.
  • oldest = DB 컬럼의 last_scheduling_decision 같은 이름의 필드 참고.

for문에 있는 schedule_dag_run(dag_run)

  • schedule dagrun..
    • handle callbacks
    • checks if DAG structure changes
      • 아직은 dynamic DAG를 On-the-fly로 반영하는 로직의 edge case 커버가 부족하지만, 기본적인 dynamic change에는 잘 반영됨
      • 없어진 Task, 신규 task 등...
    • Compute which taskInstances can now be 'schedueled' (via currently-misnamed DagRun.update_state method)
  • Pass pending callbacks to DagFileProcessorManager

critical_section_execute_task_instance()

  • scheduler는 treated as running에서 멈춘다. 나머지 작업은 executor가 처리하도록 넘겨줌.
  • queued_task 개수만 응답으로 받는다.

schedule_dag_run 과 critical_section_execute_task_instancce 메소드 두 개가 동작의 핵심.

  • schedule_dag_run은 task를 queue에 집어넣고
  • execute_task_instance는 executor가 queue에서 꺼내 실행하도록 돌려놓는다.

스크린샷 2024-12-16 오후 2 48 23

 

Before Enqueueing a TaskInstance

  • task가 실행될 수 있는 open pool slot이 있는지.
  • max_active_task per DAG이 limit 초과하지는 않는지
  • DAG와 task가 task_concurrency limit을 초과하지는 않는지
  • Executor slots available (parallelism)

Executor

스크린샷 2024-12-16 오후 2 54 52

 

  • k8s Executor: pod 실행.
  • celery Executor: send message to celery broker. worker picks up.
  • local Executor: local process pool에 전달, local에서 실행. scheduler 내부에서 실행됨

it predominantly keeps the state in Memory.

  • drawback처럼 보일 수 있으나..
  • 애초에 scheduler와 executor가 같은 프로세스에 있고, 매번 DB에 조회할 필요가 없다는 점에서 성능면에서도 낫다

task가 실패할 경우, 99%는 report its own status to DB.

  • executor의 실패 / task의 실패는 완전히 다른 것.
  • task가 fail되었으나 정상적으로 상태를 보고하지 못하는 경우... executor가 monitor -> report status.

DAG Parsing

스크린샷 2024-12-16 오후 4 29 33스크린샷 2024-12-16 오후 4 51 55

 

airflow.dag_processing

  • 사용자의 DAG code 로딩하는 곳.

DagFileProcessorManager

  • scheduler에서 실행되는 subprocess.
  • subprocess pool을 관리하는 infinite loop
    • load DAG file
    • execute any Pending DAG level callbacks

큰 틀에서는 pass DAG file -> update the state in the Database -> exit

cf. reload hook같은 기능 구현이 안되어 있기 때문에, Long-running process로 실행할 경우 버그나 에러가 날 수 있다고.

  • 즉 불완전한 isolation 때문

스크린샷 2024-12-16 오후 5 20 12

 

  • check the list of processors (default = 2). 필요할 경우 processor를 추가로 생성한다
  • processor는 DAG Parsing 수행
    • dag file 파싱하고
    • DB 테이블에 저장
  • 일정 주기로 sends heartbeat back to scheduler.
  • 일정 주기로 refresh dag dir

cf. scheduler 자체는 does not execute the callbacks. (callback 수행하려면 python 함수가 필요. python 함수는 executor로 파싱해야 알 수 있기 때문)

High Availability: Use Existing Metadata DB for Sync

Airflow는 컴포넌트 간 direct communication을 지원하지 않음.

  • zookeeper나 leader election 등 정합성을 맞추는 여러 방법이 있지만, 구조가 복잡해진다.
  • airflow는 postgres를 주로 쓰는데, DB의 scale 기능을 써도 괜찮을 거라 생각했음.

Row Level Locking 사용.

 

스크린샷 2024-12-16 오후 5 32 47스크린샷 2024-12-16 오후 5 32 59스크린샷 2024-12-16 오후 5 42 06

 

  • 두 개의 scheduler가 동시에 특정 task instance를 조회할 경우... 조회 데이터의 정합성이 깨질 수 있다.
  • FOR UPDATE 명령어로 mutex 설정하는 경우... 뒤늦게 조회 시도한 쿼리는 대기 상태로 들어간다. (idle)
  • SKIP LOCKED 명령어 사용... lock 걸린 Row는 건너뛰고 나머지를 응답하는 식.
    • transaction 단위로 lock. scheduling loop 내에서는 transaction 걸 때 조심해야 한다.

스크린샷 2024-12-16 오후 5 45 09

 

sqlAlchemy의 context manager와 prohibit_commit 사용.

critical_section_execute_task_instance()

python의 concurrent programming 영역. 성능 개선을 위한 영역이라고 보면 됨.

  • scheduler가 여러 개 있고, 필요에 따라 task를 실행하는 process를 pool 내에서 생성한다.
  • 그러면, concurrently running process를 확인할 수 있는 유일한 방법은 DB에서 확인하는 것뿐.

스크린샷 2024-12-16 오후 5 49 40

 

DB lock 걸고

  • 모든 pool에서 running 상태인 task 조회
  • python 메모리 내에서 +1
  • DB에 저장

50ms / 100ms 단위로 위 쿼리 수행.

  • 만약 pool 조회 쿼리를 수행할 수 없는 경우 (lock걸려있다던가), NOWAIT
  • scheduler가 해야 할 다른 일들 (Create DagRun, schedule other task...)

스크린샷 2024-12-16 오후 5 50 00스크린샷 2024-12-16 오후 6 01 39

 

scheduler는 어느 순간 fail할 수 있다.

Active-Active 구조를 위한 기능 중 하나: task adoption.

  • 주기적으로 scheduler 상태를 체크한다. heartbeat 응답하지 않은 것들 확인.
  • dead executor에 있던 task는 다른 곳으로 adopt된다.

그 외에도 detecting zombie task 탐지해서 kill / restart 한다던가, SLAs 관리.

스크린샷 2024-12-16 오후 6 01 45

 

mini scheduler 역할을 하는 최적화 기능도 개발 중. 불필요한 loop 일부를 해소할 수 있다고 함.

반응형