HwangHub
Airflow Scheduler (Airflow 3.0 기준) 본문
작성하던 글이 날아가버린 상태에서 처음부터 다시 작성합니다... 그냥 뭐, 속상하다는 말이 하고 싶었습니다.
📌 TL;DR — Airflow Scheduler 코드 레벨 분석
Airflow 스케줄러는 CLI 명령어로 시작되며, SchedulerJobRunner가 DAG 파싱, TaskInstance 스케줄링, Executor 상태 관리 등을 수행하는 중심 객체다. _run_scheduler_loop() 내에서 반복적으로 DAG 상태를 확인하고 실행 가능한 태스크를 큐에 추가하며, Executor를 통해 병렬로 실행된다.
스케줄러는 여러 Executor와 병렬 작업을 지원하지만, TaskInstance 중복 실행을 방지하기 위해 PostgreSQL의 advisory lock을 활용한 Critical Section을 도입했다. 이를 통해 HA 환경에서 스케줄러 간의 충돌 없이 안정적인 TaskInstance 스케줄링이 가능하다.
결론적으로, Airflow Scheduler는 다양한 내부 타이머, 멀티 Executor, DB 기반 락 관리를 통해 유연하고 안전한 스케줄링을 제공하며, 실무에서 안정성과 확장성을 고려한 시스템 설계에 중요한 인사이트를 제공한다.
Airflow는 ETL, ELT, Reverse ETL 등의 데이터 파이프라인에서 많이 사용되는 워크플로우 툴이며, 최근에는 MLOps, GenAI 워크플로우로 사용 사례가 빠르게 확장되고 있습니다. 2025년 4월 23일 3.0.0 버전이 릴리스되었으며, 현재 기준 매월 3천만 다운로드(2020년 대비 30배 증가)와 80,000개 조직(2020년 기준 55,000 증가)이 사용하는 등 Airflow 2.0 release 이래로 강력한 성장을 이뤄가고 있다고 합니다. 출처: airflow 3.0.0 release
제 개인적인 생각으로, MLOps 시스템을 구축할 때 Airflow가 강력하다 생각한 이유는 두 가지 입니다.
- Airflow는 Kubernetes에 종속적이지 않으면서도, Kubernetes 환경과의 호환성이 매우 뛰어납니다. 이를 바탕으로 확장성과 격리성을 확보할 수 있습니다.
- 이미 많은 조직에서 널리 사용되고 있으며, 워크플로우 오케스트레이션 도구 중 가장 큰 커뮤니티와 생태계를 보유하고 있습니다. 이는 안정성 검토에 큰 도움이 되고, 문제 해결에 있어서도 레퍼런스를 참고하여 빠르게 해결할 수 있게 됩니다.
비즈니스와 연결되는 기술 선택에 있어서 안정성이 가장 중요하다고 생각했고, 그 다음이 확장성으로 연결되는 유연함이라고 생각했기에 Airflow는 충분히 좋은 선택지라 판단했습니다.
그렇다면, 이제 저에게 남은 것은 Airflow를 이해하는 것이었습니다. 엔지니어의 입장에서 Airflow를 제대로 이해하기 위해서는 코어 엔진을 Code Level에서 분석해보는 것이 필요하다 판단했고, 결국 Airflow Scheduler 부분의 코드를 탐구하기로 했습니다. 이 글을 읽으시는 분 께서는 아마 저와 비슷한 이유로 이 글을 보고 있으실 텐데, 부디 조금이나마 도움이 되시길 바랍니다.
아래에서 제공되는 모든 코드는 apache/airflow 공식 레포지토리에서 발췌하였음을 밝혔으며, 이해를 돕기 위해 사소한 편집이 있을 수 있습니다.
airflow scheduler
@cli_utils.action_cli
@providers_configuration_loaded
def scheduler(args: Namespace):
"""Start Airflow Scheduler."""
print(settings.HEADER)
run_command_with_daemon_option(
args=args,
process_name="scheduler",
callback=lambda: _run_scheduler_job(args),
should_setup_logging=True,
)
def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
Airflow의 스케줄러는 `airflow scheduler` 라는 cli command를 실행하여 스케줄러 프로세스를 시작합니다. 이는 _run_scheduler_job() 함수를 실행하여 SchedulerJobRunner 객체를 생성하여 스케줄러 실행 환경을 구성하고, run_job()을 사용하여 스케줄링을 수행합니다.
SchedulerJobRunner
class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
job_type = "SchedulerJob"
active_spans = ThreadSafeDict()
def __init__(
self,
job: Job,
num_runs: int = conf.getint("scheduler", "num_runs"),
scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"),
log: logging.Logger | None = None,
):
super().__init__(job)
self.num_runs = num_runs
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
self._task_instance_heartbeat_timeout_secs = conf.getint(
"scheduler", "task_instance_heartbeat_timeout"
)
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
self._num_stuck_queued_retries = conf.getint(
section="scheduler",
key="num_stuck_in_queued_retries",
fallback=2,
)
if self._enable_tracemalloc:
import tracemalloc
tracemalloc.start()
if log:
self._log = log
self.scheduler_dag_bag = SchedulerDagBag()
SchedulerJobRunner 객체는 DAG와 TASK의 실행 시점을 결정하고 적절한 시간에 작업이 실행되도록 하는 역할을 담당하는 'Airflow Scheduler의 심장'입니다. 기본 작업 실행 기능을 가진 BaseJobRunner와 로깅 기능을 제공하는 LoggingMixin 클래스를 상속받아 구성됩니다.
SchedulerJobRunner 객체 생성이 "스케줄링 실행 환경 구성"이 되는 이유는 속성을 살펴보면 알 수 있습니다. 주요 속성만 살펴 보겠습니다.
- num_runs : 스케줄링 루프를 실행할 횟수
- _scheduler_idle_sleep_time : 프로세서 폴링 사이의 대기 시간(초)
- _task_instance_heartbeat_timeout_secs : 태스크 인스턴스가 타임아웃되기 전 하트비트를 기다리는 시간(초)
- _dag_stale_not_seen_duration : DAG가 오래되었다고 간주되기 전에 보이지 않아도 되는 시간
- _task_queued_timeout : 큐에 있는 태스크의 타임아웃 기준
- _enable_tracemalloc : 메모리 추적 기능(tracemalloc)을 활성화할지 여부
- _num_stack_queued_retries : 큐에 갇힌 작업에 대한 재시도 횟수. (기본값 2회)
- scheduler_dag_bag : 파싱된 DAG들을 관리하는 인스턴스
보면 알 수 있듯, 타임아웃 기준 등 스케줄링에 필요한 값들을 관리하는 객체가 SchedulerJobRunner이므로, 인스턴스 생성이 곧 스케줄링 실행 환경 구성이라고 말할 수 있습니다.
SchedulerJobRunner()._execute()
# 이해를 위해 코드 일부를 편집하였습니다.
def _execute(self) -> int | None:
callback_sink = DatabaseCallbackSink() # 태스크 실행 상태 관련 콜백을 처리하기 위해 DB콜백싱크 설정
for executor in self.job.executors: # multi executor에 job_id와 callback_sink 설정하고 start
executor.job_id = self.job.id
executor.callback_sink = callback_sink
executor.start()
self._run_scheduler_loop() # 스케줄러 루프 실행
settings.Session.remove() # DB 세션 정리
for executor in self.job.executors: # multi executor 종료
executor.end()
return None # 언제나 None 반환
스케줄러 작업의 핵심 실행 로직이 바로 SchedulerJobRunner의 _execute() 입니다. 이 중에서 _run_scheduler_loop() 함수가 우리가 아는 스케줄링 동작을 수행하는 부분이라고 이해하면 됩니다.
해당 작업 실행에 앞서, 스케줄러 수행 간에 Airflow DB를 통해 TaskInstance의 상태값을 관리하기 때문에 DB콜백싱크 객체를 설정하고 있으며, multi executor(Airflow 2.10.0부터 지원)와 관련된 설정도 위 로직에서 처리되고 있습니다.
SchedulerJobRunner()._run_scheduler_loop()
여기서부터는 함수 하나하나가 무거워서 조각내어 분석해 보겠습니다.
def _run_scheduler_loop(self) -> None:
"""
Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop.
The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
"""
_run_scheduler_loop() 메서드는 Airflow Scheduler의 핵심 루프를 구현하는 부분입니다. 이 메서드는 DAG 파싱 결과를 수집하고, 실행 가능한 태스크를 찾아 큐에 추가하며, Executor의 하트비트를 처리하는 등의 핵심 작업을 수행합니다.
timers = EventScheduler()
# Check on start up, then every configured interval
self.adopt_or_reset_orphaned_tasks()
timers.call_regular_interval(
conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
self.adopt_or_reset_orphaned_tasks,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "trigger_timeout_check_interval", fallback=15.0),
self.check_trigger_timeouts,
)
timers.call_regular_interval(
30,
self._mark_backfills_complete,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
self._emit_pool_metrics,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0),
self._emit_running_ti_metrics,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
self._find_and_purge_task_instances_without_heartbeats,
)
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._handle_tasks_stuck_in_queued,
)
timers.call_regular_interval(
conf.getfloat("scheduler", "parsing_cleanup_interval"),
self._update_asset_orphanage,
)
먼저 EventScheduler 인스턴스를 생성하여 timers 변수에 할당합니다. EventScheduler 클래스는 일정 간격으로 함수를 반복 실행시키기 위한 유틸리티 메서드인 call_regular_interval()를 정의하고 있으며, 이를 이용하여 일정 delay마다 실행시키고 싶은 callable 객체(함수 객체)를 실행할 수 있습니다.
이후에는 태스크 상태 체크를 통한 비정상 TaskInstance 정리 등 Clean-up 작업이 주기적으로 이뤄질 수 있도록 세팅하는 코드로 이어집니다. 이를 통해 스케줄러가 주기적으로 Airflow 시스템 상태를 모니터링하고 정상으로 유지합니다. 작업들을 간단히 소개하면 다음과 같습니다.
- adopt_or_reset_orphanted_tasks : 고아 태스크 스케줄러 재할당 또는 상태를 none 또는 retry로 돌림. (기본 300초마다)
- check_trigger_timeouts : 트리거 타임아웃 확인 (기본 15초마다)
- _mark_backfills_complete : 백필 작업 완료 처리 (고정 30초마다)
- _emit_pool_metrics : Pool 메트릭 출력 (기본 5초마다)
- _emit_running_ti_metrics : 실행 중인 TaskInstance 메트릭 출력 (기본 30초마다)
- _find_and_purge_task_instances_without_heartbeats : 하트비트 없는 TaskInstance 제거 (기본 10초마다)
- _update_dag_run_state_for_paused_dags : 일시 중지된 DAG의 상태 업데이트 (고정 60초마다)
- _handle_tasks_stuck_in_queued : 타임아웃 기준이 넘도록 Queued된 Task 처리
- _update_asset_orphanage : 사용되지 않는 Asset 처리
[용어 정리]
- Orphanted Task : 스케줄러와 연결이 끊긴 채 'Running' 상태로 남은 TaskInstance
- Pool과 Slot : 병렬적으로 수행되는 Task간의 자원 충돌 문제를 방지할 수 있는 장치
- Asset : DAG 파싱 결과 등 실행에 사용되는 metadata 정보 등을 관리하는 단위
- Backfill : 특정 시간 구간에 대해 실행되지 않은 DAG Run들을 수동으로 채워넣는 것
- Trigger : 비동기 이벤트를 기반으로 Task를 실행할 수 있게 하는 장치 (Triggerer라는 별도 프로세스에서 관리)
그리고 나서 스케줄러의 핵심인 무한 루프 로직이 등장합니다.
for loop_count in itertools.count(start=1):
# 트레이싱 및 타이머 설정
with (
Trace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span,
Stats.timer("scheduler.scheduler_loop_duration") as timer,
):
# ... 코드 생략 ...
각 루프 반복에서는 스케줄링 작업 수행, Executor 하트비트 처리, Executor 이벤트 처리 등이 진행됩니다.
# 1. 스케줄링 작업 수행
with create_session() as session:
self._end_spans_of_externally_ended_ops(session)
# 가능한 모든 실행자에 대해 스케줄링 수행
num_queued_tis = self._do_scheduling(session)
# 메모리 해제
session.expunge_all()
스케줄링 로직 중에서 실제 스케줄링 작업을 수행하는 _do_scheduling() 메서드가 수행되는 부분입니다. 이는 DAG Run 객체 생성, TaskInstance 상태 업데이트, Queueing 등의 작업이 수행됩니다. 이는 이따가 집중해서 보겠습니다.
# 2. Executor 하트비트 처리
for executor in self.job.executors:
# 새 Task 할당 여부와 관계 없이, 모든 Executor의 하트비트 수행
executor.heartbeat()
모든 Executor의 하트비트를 처리합니다. 즉, 현재 실행 중인 Task의 상태를 확인하고 'Executor 이벤트'를 발생시킬 수 있습니다.
# 3. Executor 이벤트 처리
with create_session() as session:
num_finished_events = 0
for executor in self.job.executors:
num_finished_events += self._process_executor_events(
executor=executor, session=session
)
Executor에서 발생한 Task 완료 or 실패 등의 Executor 이벤트를 처리합니다. 발생한 이벤트에 따라 Task의 상태를 갱신하고, 필요한 처리를 수행합니다.
# 4. Task 이벤트 로그 처리
for executor in self.job.executors:
try:
with create_session() as session:
self._process_task_event_logs(executor._task_event_logs, session)
except Exception:
self.log.exception("Something went wrong when trying to save task event logs.")
Task 이벤트 로그를 DB에 저장합니다.
# 5. 예약된 타이머 이벤트 실행 및 대기
next_event = timers.run(blocking=False)
self.log.debug("Next timed event is in %f", next_event)
# ... 코드 생략 ...
# 유휴 상태인 경우 대기
if not is_unit_test and not num_queued_tis and not num_finished_events:
# 스케줄러가 작업 중이면 대기하지 않음
time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
타이머(EventScheduler)에 등록된 callable 이벤트를 실행하고, 스케줄러의 queue에 TaskInstance가 없고 완료된 Executor 이벤트가 없으면 스케줄러가 작업중이지 않다고 보고 CPU 점유를 해제하고 정해진 시간 만큼 잠시 스케줄러를 대기합니다.
# 6. 종료 조건 확인
if loop_count >= self.num_runs > 0:
self.log.info(
"Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
self.num_runs,
loop_count,
)
# ... 코드 생략 ...
break
num_runs가 설정된 경우에만 동작하며, 요청된 실행 횟수에 도달한 경우 루프를 종료합니다. 만약 최대 반복 수를 설정하지 않으면 무한으로 동작합니다.
SchedulerJobRunner()._do_scheduling()
그러면, 이번에는 스케줄러 루프의 핵심인 _do_scheduling()을 들여다 보겠습니다.
def _do_scheduling(self, session: Session) -> int:
"""
Make the main scheduling decisions.
It:
- Creates any necessary DAG runs by examining the next_dagrun_create_after column of DagModel
Since creating Dag Runs is a relatively time consuming process, we select only 10 dags by default
(configurable via ``scheduler.max_dagruns_to_create_per_loop`` setting) - putting this higher will
mean one scheduler could spend a chunk of time creating dag runs, and not ever get around to
scheduling tasks.
- Finds the "next n oldest" running DAG Runs to examine for scheduling (n=20 by default, configurable
via ``scheduler.max_dagruns_per_loop_to_schedule`` config setting) and tries to progress state (TIs
to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
By "next oldest", we mean hasn't been examined/scheduled in the most time.
We don't select all dagruns at once, because the rows are selected with row locks, meaning
that only one scheduler can "process them", even it is waiting behind other dags. Increasing this
limit will allow more throughput for smaller DAGs but will likely slow down throughput for larger
(>500 tasks.) DAGs
- Then, via a Critical Section (locking the rows of the Pool model) we queue tasks, and then send them
to the executor.
See docs of _critical_section_enqueue_task_instances for more.
:return: Number of TIs enqueued in this iteration
"""
_do_scheduling() 메서드는 크게 세 부분으로 나눌 수 있습니다.
- DAG Run 생성 및 시작
- 실행 중인 DAG Run의 상태 진행
- Critical Section에서의 태스크 큐잉
각 부분을 자세히 살펴보겠습니다.
# 1. DAG Run 생성 및 시작
with prohibit_commit(session) as guard:
if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)
self._start_queued_dagruns(session)
guard.commit()
여기서 두 가지 주요 작업이 수행됩니다.
- DAG Run 생성 : _create_dagruns_for_dags() 메서드는 DagModel의 next_dagrun_create_after 컬럼을 검사하여 새로운 DAG Run을 생성합니다. (docs에 따르면) DAG Run 생성은 상대적으로 시간이 많이 소요되는 프로세스이므로, 기본적으로 한 번에 10개의 DAG만 선택합니다(scheduler.max_dagruns_to_create_per_loop 설정으로 구성 가능). 이 값을 높이면 하나의 스케줄러가 DAG Run 생성에 많은 시간을 소비하게 되어 태스크 스케줄링에 충분한 시간을 할애하지 못할 수 있습니다.
- 대기 중인 DAG Run 시작 : _start_queued_dagruns() 메서드는 QUEUED 상태의 DAG Run을 찾아 실행을 시작합니다.
# 2-1. 실행 중인 DAG Run의 상태 진행 :: 상태 조회 및 변경
# 검사할 현재 활성 상태의 DAG Run을 일괄적으로 가져옴
dag_runs = DagRun.get_running_dag_runs_to_examine(session=session)
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
이 부분에서는
- 실행 중인 DAG Run 조회 : get_running_dag_runs_to_examine() 메서드는 "다음 n개의 가장 오래된" 실행 중인 DAG Run을 조회합니다(기본값 n=20, scheduler.max_dagruns_per_loop_to_schedule 설정으로 구성 가능). "가장 오래된"이란 가장 오랫동안 검사/스케줄링되지 않은 DAG Run을 의미합니다.
- DAG Run 상태 진행 : _schedule_all_dag_runs() 메서드는 조회된 DAG Run의 상태를 진행시킵니다. 이 과정에서 TI 상태를 SCHEDULED로 변경하거나, DAG Run을 SUCCESS/FAILURE 등으로 변경합니다.
모든 DAG Run을 한 번에 선택하지 않는 이유는 행이 행 잠금과 함께 선택되기 때문입니다. 이는 여러 스케줄러 중 하나씩 각 DAG의 TASK를 처리할 수 있게 하여 분산 환경에서의 실행 간 동시성 이슈를 방지하면서, 처리량을 향상시키기 위해서임을 추측할 수 있습니다.
# 2-2. 실행 중인 DAG Run의 상태 진행 :: DAG 콜백 처리
# 컨텍스트가 최신 상태인지 확인하기 위해 commit 후 콜백 전송
cached_get_dag: Callable[[DagRun], DAG | None] = lru_cache()(
partial(self.scheduler_dag_bag.get_dag, session=session)
)
for dag_run, callback_to_run in callback_tuples:
dag = cached_get_dag(dag_run)
if dag:
# 데이터베이스로 콜백을 전송하므로 prohibit_commit 외부에서 수행해야 합니다.
self._send_dag_callbacks_to_processor(dag, callback_to_run)
else:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
이 부분에서는 _schedule_all_dag_runs()에서 반환된 콜백 튜플을 처리합니다. 각 콜백은 해당 DAG와 함께 프로세서로 전송됩니다. 이 작업은 세션 commit 후에 수행되어 컨텍스트가 최신 상태인지 확인합니다.
# 3. Critical Section에서의 태스크 큐잉
with prohibit_commit(session) as guard:
# 세션이 DB의 유효한 뷰를 갖도록 함
session.expunge_all()
# 일부 Executor만 가득 차 있고 모두 가득 차 있지 않은 경우에도 스케줄링 시도
total_free_executor_slots = sum([executor.slots_available for executor in self.job.executors])
if total_free_executor_slots <= 0:
# 여기서는 아무것도 할 수 없으므로 시도조차 하지 않음
self.log.debug("All executors are full, skipping critical section")
num_queued_tis = 0
else:
try:
timer = Stats.timer("scheduler.critical_section_duration")
timer.start()
# SCHEDULED 상태의 TaskInstance를 찾아 QUEUED로 변경 -> Executor에게 전송
num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
# Lock을 획득한 경우에만 이 메트릭 전송
timer.stop(send=True)
except OperationalError as e:
timer.stop(send=False)
if is_lock_not_available_error(error=e):
self.log.debug("Critical section lock held by another Scheduler")
Stats.incr("scheduler.critical_section_busy")
session.rollback()
return 0
raise
guard.commit()
return num_queued_tis
이 부분은 Critical Section으로, Pool 모델의 행을 잠그고 태스크를 큐에 추가한 다음 실행자에게 전송합니다.
- Executor 슬롯 확인 : 사용 가능한 Executor 슬롯이 있는지 확인합니다. 모든 Executor의 슬롯이 가득 찬 경우, Critical Section을 건너뜁니다.
- 태스크 큐잉: _critical_section_enqueue_task_instances() 메서드를 통해 SCHEDULED 상태의 TI를 찾아 QUEUED 상태로 변경하고 실행자에게 전송합니다.
- 잠금 관리: Critical Section은 잠금 메커니즘을 사용하여 한 번에 하나의 스케줄러만 이 작업을 수행할 수 있도록 합니다. 다른 스케줄러가 이미 잠금을 보유하고 있는 경우, 현재 스케줄러는 이 작업을 건너뛰고 0을 반환합니다.
SchedulerJobRunner()._critical_section_enqueue_task_instances()
스케줄러의 대략적인 동작은 위에까지 보면 됩니다. 이제부터는 여러 스케줄러가 DAG를 어떻게 동시성 이슈 없이 관리하는지 Critical Section과 관련된 부분을 들여다 보겠습니다.
def _critical_section_enqueue_task_instances(self, session: Session) -> int:
"""
Enqueues TaskInstances for execution.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the expected states
and that we do not exceed max_active_runs or pool limits.
2. Change the state for the TIs above atomically.
3. Enqueue the TIs in the executor.
HA note: This function is a "critical section" meaning that only a single scheduler process can
execute this function at the same time. This is achieved by doing
``SELECT ... from pool FOR UPDATE``. For DBs that support NOWAIT, a "blocked" scheduler will skip
this and continue on with other tasks (creating new DAG runs, progressing TIs from None to SCHEDULED
etc.); DBs that don't support this (such as MariaDB or MySQL 5.x) the other schedulers will wait for
the lock before continuing.
:param session:
:return: Number of task instance with state changed.
"""
# The user can either request a certain number of tis to schedule per main scheduler loop (default
# is non-zero). If that value has been set to zero, that means use the value of core.parallelism (or
# however many free slots are left). core.parallelism represents the max number of running TIs per
# scheduler. Historically this value was stored in the executor, who's job it was to control/enforce
# it. However, with multiple executors, any of which can run up to core.parallelism TIs individually,
# we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally
# across all executors.
스케줄러 프로세스는 Airflow의 핵심이다보니 HA로 구성을 하게 됩니다. HA로 구성하면 처리량 향상과 안정성 향상을 동시에 노릴 수 있어 유용합니다. 하지만, 이러한 구조에서는 별도의 조치가 되어 있지 않다면 동일한 DAG의 동일한 TASK를 각각의 스케줄러가 동시에 접근하여 중복된 수행 등 문제가 발생할 수 있습니다. Airflow에서는 이러한 동시성 이슈를 해결하기 위해 스케줄러가 각 TaskInstance에 접근하여 Enqueue할 때, 한 번에 하나의 스케줄러만 수행할 수 있도록 DB Lock을 이용합니다.
num_occupied_slots = sum([executor.slots_occupied for executor in self.job.executors])
parallelism = conf.getint("core", "parallelism")
if self.job.max_tis_per_query == 0:
max_tis = parallelism - num_occupied_slots
else:
max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots)
if max_tis <= 0:
self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!")
return 0
먼저, 현재 TaskInstance를 띄울 수 있는 상태인지 계산합니다. 이는 한 번에 너무 많은 TaskInstance를 처리하지 않도록 제어하는 부분입니다.
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
실행 가능한 TaskInstance를 QUEUED 상태로 변경하는 부분입니다. _executable_task_instances_to_queued() 함수에서는 SCHEDULED 상태인 TaskInstance 중에서 실행 가능한 TI를 선택하고, Pool 제한 등을 체크한 뒤 QUEUED 상태로 변경하고 DB 커밋을 합니다. 이 때, TaskInstance를 선택하는 과정에서 Lock을 활용하도록 내부 로직이 구성되어 있습니다.
# _executable_task_instances_to_queued 일부
lock_acquired = session.execute(
text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
id=DBLocks.SCHEDULER_CRITICAL_SECTION.value
)
).scalar()
코드를 뜯어보면 postgresql의 advisory lock을 사용하고 있음을 알 수 있습니다. 이는 postgresql에서 지원하는 사용자 정의 락이며, 그 중에서 pg_try_advisory_xact_lock은 트랜잭션 단위 락으로써, 커밋 등으로 트랜잭션이 끝나면 자동으로 해제되는 lock을 사용한 것을 알 수 있습니다. 이렇게 Lock을 획득한 상태에서 이후 TaskInstance 선택 쿼리 등을 진행하기 때문에 한 번에 하나의 스케줄러만 각 TaskInstance에 접근할 수 있는 것입니다.
# Sort queued TIs to their respective executor
executor_to_queued_tis = self._executor_to_tis(queued_tis)
for executor, queued_tis_per_executor in executor_to_queued_tis.items():
self.log.info(
"Trying to enqueue tasks: %s for executor: %s",
queued_tis_per_executor,
executor,
)
self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, executor, session=session)
def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
for ti in tis:
if executor_obj := self._try_to_load_executor(ti.executor):
_executor_to_tis[executor_obj].append(ti)
return _executor_to_tis
def _enqueue_task_instances_with_queued_state(
self, task_instances: list[TI], executor: BaseExecutor, session: Session
) -> None:
"""
Enqueue task_instances which should have been set to queued with the executor.
:param task_instances: TaskInstances to enqueue
:param executor: The executor to enqueue tasks for
:param session: The session object
"""
# actually enqueue them
for ti in task_instances:
if ti.dag_run.state in State.finished_dr_states:
ti.set_state(None, session=session)
continue
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
workload = workloads.ExecuteTask.make(ti, generator=executor.jwt_generator)
executor.queue_workload(workload, session=session)
continue
command = ti.command_as_list(
local=True,
)
priority = ti.priority_weight
queue = ti.queue
self.log.info(
"Sending %s to %s with priority %s and queue %s", ti.key, executor.name, priority, queue
)
executor.queue_command(
ti,
command,
priority=priority,
queue=queue,
)
그리고 QUEUED 상태로 변경된 TaskInstance를 Executor에 전송하여 Task가 수행될 수 있도록 합니다.
- _executor_to_tis() : 각 TaskInstance에 적절한 Executor를 매핑하여 딕셔너리로 관리합니다.
- _enqueue_task_instances_with_queued_state() : executor.queue_command() 함수를 호출하여 TaskInstance 실행 명령을 Executor에 전달합니다.
즉, DB Lock을 이용하여 HA 스케줄러는 TaskInstance에 대한 접근에 대한 동시성 이슈를 해결하고 있습니다. 특히 염두해야 하는 점은, Postgresql과 같이 NOWAIT를 지원하는 DB의 경우에는 한 스케줄러가 Lock을 획득한 경우에 다른 스케줄러는 해당 작업을 건너뛰고 다른 작업을 계속 수행할 수 있지만, MySQL과 같은 DB를 Airflow DB로 구축하게 되면 NOWAIT를 지원하지 않아 Lock이 해제될 때 까지 다른 스케줄러가 행 걸리게 되어 throughput 병목이 발생할 수 있습니다.
여기까지 보면 Scheduler의 기본 코드 흐름은 파악할 수 있겠다고 생각합니다. 세세하게는 더 많은 함수가 있지만, 하나의 글로 정리하기에는 이미 충분히 무거운 것 같네요... 다른 분들도 깃헙 코드를 헤엄치시다보면 더 많은 것을 보실 수 있으시리라 생각합니다.
Airflow는 단순한 워크플로우 오케스트레이터를 넘어, 고도로 유연하고 확장 가능한 분산 시스템으로 진화하고 있습니다. 그 중심에는 수많은 DAG와 Task를 안정적으로 스케줄링하고 실행 상태를 조율하는 Scheduler가 있습니다. 이번 글에서는 Scheduler가 어떤 방식으로 실행 환경을 구성하고, DAG를 탐색하며, TaskInstance를 스케줄링하고, Executor와의 상호작용을 통해 실질적인 작업 수행을 이끌어내는지를 코드 레벨에서 살펴보았습니다.
특히, 멀티 스케줄러 환경에서의 동시성 제어를 위한 Critical Section 구조, 그리고 데이터베이스 락(PostgreSQL의 advisory lock 등)을 이용한 충돌 방지 방식은 고가용성을 염두에 둔 설계임을 확인할 수 있었습니다. 이처럼 성숙한 오픈소스의 코드를 들여다보다 보면 해당 도구에 대한 이해 뿐만 아니라 좋은 설계에 대하여도 배우게 되는 것 같습니다.
Airflow를 깊이 이해한다는 것은 단지 DAG 파일을 작성하는 수준을 넘어서, 내부 동작 원리를 정확히 파악하고 문제 상황에 유연하게 대처할 수 있는 기반을 마련하는 것입니다. 이 글이 그런 관점에서 조금이나마 도움이 되었다면 기쁘겠습니다. 읽어주신 모든 분들께 감사하다는 말씀 드립니다.
'workspace > mlops' 카테고리의 다른 글
[MLOPS] Data Scaling (0) | 2025.03.20 |
---|---|
[MLOPS] 샘플링, 의미 있는 행동인가? (0) | 2025.03.12 |
[MLOPS] 편향된 데이터셋을 공정하게 처리하는 방법 (feat. AIF360) (0) | 2025.03.05 |