In our daily work, we often use tasks that need to be executed periodically, one way is to use Linux crond in combination with command line implementation. Another way is to use Python directly, and the following is a list of common ways to implement Python timed tasks.

Use while True: + sleep() to implement a timed task

The sleep(secs) function in the time module allows you to pause the current thread for secs before resuming execution. By suspend, we mean that the current thread enters a blocking state, and when the time specified by the sleep() function is reached, it changes from blocking to ready state and waits for CPU scheduling.

Based on this feature, we can implement a simple timed task by means of while dead loop + sleep().

Code example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import datetime
import time


def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)


def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)  # 暂停5秒


if __name__ == "__main__":
    loop_monitor()

Main disadvantages.

  • can only set the interval, can not specify a specific time, such as 8:00 am every day
  • sleep is a blocking function, which means that the program can’t do anything during the sleep period.

Running timed tasks with the Timeloop library

Timeloop is a library that can be used to run multi-cycle tasks. It is a simple library that uses the decorator pattern to run tagged functions in threads.

Sample code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import time

from timeloop import Timeloop
from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print "5s job current time : {}".format(time.ctime())


@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print "10s job current time : {}".format(time.ctime())

Timer implementation using threading.

Timer in threading module is a non-blocking function, a little better than sleep, timer is the most basic understanding of the timer, we can start multiple timer tasks, these timer tasks are asynchronous execution, so there is no waiting order execution problem.

Timer(interval, function, args=[ ], kwargs={ })

  • interval: the specified time
  • function: the method to be executed
  • args/kwargs: parameters of the method

Code example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import datetime
from threading import Timer


def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()


def loop_monitor():
    t = Timer(5, time_printer)
    t.start()


if __name__ == "__main__":
    loop_monitor()

Note: Timer can only be executed once, here it needs to be called in a loop, otherwise it can only be executed once

Implementing timed tasks with the built-in module sched

The sched module implements a generic event scheduler that uses a delay function in the scheduler class to wait for a specific amount of time to execute a task. It also supports multi-threaded applications, where the delay function is called immediately after each task is executed to ensure that other threads can also execute.

class sched.scheduler(timefunc, delayfunc) This class defines a generic interface for scheduling events, it takes two external arguments, timefunc is a function that returns a number of time type without arguments (commonly used as time in the time module), delayfunc should be a function that takes one argument to call, is compatible with the output of timefunc, and serves to delay multiple time units (commonly used as sleep in the time module).

Code example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import datetime
import time
import sched


def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()


def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成调度器
    s.enter(5, 1, time_printer, ())
    s.run()


if __name__ == "__main__":
    loop_monitor()

Main methods of scheduler object:

  • enter(delay, priority, action, argument), schedule an event to delay the delay by one time unit.
  • cancel(event): removes an event from the queue. This method will run a ValueError if the event is not the one currently in the queue.
  • run(): Runs all scheduled events. This function will wait (using the delayfunc() function passed to the constructor) and then execute the events until there are no more scheduled events.

Personal comment: better than threading.Timer, no need to call it in a loop.

Implementing timed tasks with the scheduling module schedule

schedule is a third-party lightweight task scheduling module that can execute times by seconds, minutes, hours, dates, or custom events. schedule allows users to run Python functions (or other callable functions) at regular intervals using a simple, user-friendly syntax.

Let’s look at the code first. Is it possible to understand what it means without reading the documentation?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import schedule
import time


def job():
    print("I'm working...")


schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

Decorators: decorate static methods with @repeat()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import time
from schedule import every, repeat, run_pending


@repeat(every().second)
def job():
    print('working...')


while True:
    run_pending()
    time.sleep(1)

Transfer parameters.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import schedule


def greet(name):
    print('Hello', name)


schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

while True:
    schedule.run_pending()

Decorators can also pass parameters.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from schedule import every, repeat, run_pending


@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)


while True:
    run_pending()

Cancellation of tasks.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import schedule

i = 0


def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)


job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()

Run a task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time
import schedule


def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob


schedule.every().minute.at(':34').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

Retrieve tasks based on tags.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 检索所有任务:schedule.get_jobs()
import schedule


def greet(name):
    print('Hello {}'.format(name))


schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

friends = schedule.get_jobs('friend')
print(friends)

Cancellation of tasks according to the label.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 取消所有任务:schedule.clear()
import schedule


def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')


schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')

while True:
    schedule.run_pending()

Run the task to a certain time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import schedule
from datetime import datetime, timedelta, time


def job():
    print('working...')


schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止

schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止

while True:
    schedule.run_pending()

Run all tasks immediately (mainly for testing).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import schedule


def job():
    print('working...')


def job1():
    print('Hello...')


schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)

schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

Running in parallel: Using Python’s built-in queue implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

Implementing timed tasks using the task framework APScheduler

APScheduler(advanced python scheduler) based on Quartz a Python timed task framework that implements all the features of Quartz , very easy to use . Provides date-based, fixed interval and crontab type tasks, and can persist tasks. Based on these features , we can easily implement a Python timed task system .

It has the following three features.

  • Liunx Cron-like scheduler (optional start/end time)
  • Interval-based execution scheduling (periodic scheduling with optional start/end times)
  • One-time execution of tasks (run a task once at a set date/time)

APScheduler has four components.

  • trigger contains the scheduling logic, and each job has its own trigger for deciding which job will run next. Triggers are completely stateless, except for their own initial configuration.
  • Job store stores the scheduled jobs. The default job store simply stores the jobs in memory, while other job stores store the jobs in the database. A job’s data speaks are serialized when saved in the persistent job store and deserialized when loaded. Schedulers cannot share the same job store.
  • Executors handle the running of jobs, and they usually do so by submitting the formulated callable object in the job to a thread or incoming pool. When the job completes, the executor will notify the scheduler. The * scheduler is the other component. You usually have only one scheduler in the application. Application developers usually do not deal with job stores, schedulers, and triggers directly; instead, the scheduler provides the appropriate interface to handle these. Configuring job stores and executors can be done in the scheduler, such as adding, modifying and removing jobs. By configuring executor, jobstore, trigger, using a thread pool (ThreadPoolExecutor default 20) or process pool (ProcessPoolExecutor default 5) and running up to 3 (max_instances) task instances simultaneously by default, you can add, delete Change check and other scheduling control

Example code.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


# 输出时间
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()

Important concepts in APScheduler

Job

Job is the smallest execution unit of APScheduler. When creating a Job, specify the function to be executed, the parameters required in the function, and some information about the settings when the Job is executed.

Build description.

  • id: Specify the unique ID of the job
  • name: Specify the name of the job.
  • trigger: trigger defined by apscheduler, used to determine the execution time of the Job, according to the set trigger rules, calculate the next execution time of this job, will be executed when satisfied
  • executor: executor defined by apscheduler, the name of the executor is set when the job is created, according to the string your name to the scheduler to get the executor to execute this job, and execute the function specified by the job
  • max_instances: the maximum number of instances to execute this job, when executor executes a job, it calculates the number of executions based on the id of the job, and determines whether it is executable based on the maximum number of instances set
  • next_run_time: the next execution time of the Job, you can specify a time [datetime] when creating the Job, if not, the default is to get the trigger time according to the trigger
  • misfire_grace_time: the delayed execution time of Job, for example, the scheduled execution time of Job is 21:00:00, but it is executed at 21:00:31 due to service restart or other reasons, if this key is set to 40, then the Job will continue to execute, otherwise the Job will be discarded
  • coalesce: whether the Job is merged or not, is a bool value. For example, the scheduler stops 20s after restarting, and the trigger of the job is set to 5s to execute once, so this job misses 4 execution times, if set to yes, it will be merged to one execution, otherwise it will be executed one by one
  • func: the function executed by Job
  • args: the position argument needed by the Job execution function
  • kwargs: keyword arguments needed by the Job execution function

Trigger

Trigger bound to Job, in the scheduler scheduling filter Job, according to the rules of the trigger to calculate the trigger time of the Job, and then compared with the current time to determine whether this Job will be executed, in short, is based on the rules of the trigger to calculate the next execution time.

Currently, APScheduler supports the following triggers.

  • DateTrigger with specified time
  • IntervalTrigger with specified interval time
  • CronTrigger like the Linux crontab.
Trigger parameters: date

date timing, the job is executed only once.

  • run_date (datetime|str) – the date/time to run the job at
  • timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
1
2
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
Trigger parameters: interval

interval interval scheduling

  • weeks (int) - intervals of weeks
  • days (int) - intervals of several days
  • hours (int) - intervals of hours
  • minutes (int) - intervals of several minutes
  • seconds (int) - how many seconds between
  • start_date (datetime|str) - the start date
  • end_date (datetime|str) - the end date
  • timezone (datetime.tzinfo|str) - time zone
1
sched.add_job(job_function, 'interval', hours=2)
Trigger parameters: cron

cron dispatch

  • (int|str) means the parameter can be either int or str
  • (datetime | str) means that the argument can be either of type datetime or str
  • year (int|str) - 4-digit year - (indicates a four-digit year, such as 2008)
  • month (int|str) - month (1-12) - (indicates that the range of values is 1-12 months)
  • day (int|str) - day of the (1-31) - (indicates that the range of values is 1-31 days)
  • week (int|str) - ISO week (1-53) - (December 31, 2006 on the Gregorian calendar can be written as 2006-W52-7 (extended form) or 2006W527 (compact form))
  • day_of_week (int|str) - number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) - (indicates the first day of the week, either in 0-6 or in its English abbreviations)
  • hour (int|str) - hour (0-23) - (indicates a range of values from 0-23 hours)
  • minute (int|str) - minute (0-59) - (indicates a range of 0-59 minutes)
  • second (int|str) - second (0-59) - (means the range is 0-59 seconds)
  • start_date (datetime|str) - earliest possible date/time to trigger on (inclusive) - (indicates start time)
  • end_date (datetime|str) - latest possible date/time to trigger on (inclusive) - (indicates the end time)
  • timezone (datetime.tzinfo|str) - time zone to use for the date/time calculations (defaults to scheduler timezone) - (indicates time zone fetch)

Expressions available for CronTrigger.

Expressions Parameter Type Description
* All Wildcard. Example: minutes=* i.e. trigger per minute
* / a All Execute every time a. Example: minutes="* / 3″ i.e. executed every 3 minutes
a – b All Triggered in the range of a - b. Example: minutes=“2-5”. i.e. executed once per minute from 2 to 5 minutes
a – b / c All a - b range, executed at intervals of time c.。
xth y Day The first few days of the week trigger. x is the first few, y is the day of the week
last x Day The day of the week of the last week in a month triggers
last Day Triggered on the last day of the month
x, y, z All Combined expressions, which can be combined to determine the value or the above expressions
1
2
3
4
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

Executor

The Executor is initialized in the scheduler, and the Executor can be added dynamically via the scheduler’s add_executor. each executor is bound to an alias, which is used as a unique identifier to bind to the Job, and the actual execution will be based on the executor bound to the Job to find the actual The type of executor will be chosen according to the different scheduling, if you choose AsyncIO as the scheduling library, then choose AsyncIOExecutor, if you choose tornado as the scheduling library, choose TornadoExecutor, if you choose start process as the scheduling, choose ThreadPoolExecutor. The choice of Executor needs to be based on the actual scheduler to choose a different executor. The executors currently supported by APScheduler are

  • executors.asyncio: synchronous io, blocking
  • executors.gevent: io multiplexing, non-blocking
  • executors.pool: threadPoolExecutor and processProcessPoolExecutor
  • executors.twisted: event-driven based

Jobstore

The Jobstore is initialized in the scheduler, and the Jobstore can also be added dynamically via the scheduler’s add_jobstore. each jobstore is bound to an alias, and the scheduler finds the corresponding jobstore in the scheduler according to the specified jobstore when adding the Job. When adding a job, the scheduler finds the corresponding jobstore in the scheduler according to the specified jobstore, and adds the job to the jobstore. The jobstore determines how the job is stored, it is stored in memory by default (MemoryJobStore) and is not available after restart.

  • jobstores.memory: memory
  • jobstores.mongodb: stored in mongodb
  • jobstores.redis: stored in redis
  • jobstores.rethinkdb: stored in rethinkdb
  • jobstores.sqlalchemy: support sqlalchemy databases such as mysql, sqlite, etc.
  • jobstores.zookeeper: zookeeper

The different jobstores can be configured in the scheduler configuration (see Scheduler)

Event

Event is an event triggered by the APScheduler when certain operations are performed. Users can customize some functions to listen to these events and do some specific operations when certain Events are triggered. Common ones are: EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_MISSED.

Current EVENT defined by APScheduler.

  • EVENT_SCHEDULER_STARTED
  • EVENT_SCHEDULER_START
  • EVENT_SCHEDULER_SHUTDOWN
  • EVENT_SCHEDULER_PAUSED
  • EVENT_SCHEDULER_RESUMED
  • EVENT_EXECUTOR_ADDED
  • EVENT_EXECUTOR_REMOVED
  • EVENT_JOBSTORE_ADDED
  • EVENT_JOBSTORE_REMOVED
  • EVENT_ALL_JOBS_REMOVED
  • EVENT_JOB_ADDED
  • EVENT_JOB_REMOVED
  • EVENT_JOB_MODIFIED
  • EVENT_JOB_EXECUTED
  • EVENT_JOB_ERROR
  • EVENT_JOB_MISSED
  • EVENT_JOB_SUBMITTED
  • EVENT_JOB_MAX_INSTANCES

Listener represents the user-defined listening to some Event, for example, when the Job triggered the EVENT_JOB_MISSED event can do some other processing according to the needs.

Scheduler

The Scheduler is the core of the APScheduler, through which all relevant components are defined. scheduler, once started, will start scheduling according to the configured tasks. In addition to waking up the scheduler based on the upcoming scheduling time generated by all the trigers that define the Job. Scheduling is also triggered when a change in Job information occurs.

APScheduler supports the following scheduler methods, the more common ones are BlockingScheduler and BackgroundScheduler

  • BlockingScheduler: Applicable to the scheduler which is the only running process in the process, calling the start function will block the current thread and cannot return immediately.
  • BackgroundScheduler: Applicable to scheduler running in the background of the application, the main thread will not block after calling start.
  • AsyncIOScheduler: For applications that use the asyncio module.
  • GeventScheduler: For applications that use the gevent module.
  • TwistedScheduler: for applications that build Twisted.
  • QtScheduler: for building Qt applications.

Workflow of Scheduler

Scheduler add job process.

Scheduler scheduling process.

Implementing Timed Tasks with Celery, a Distributed Messaging System

Celery is a simple, flexible, and reliable distributed system for handling large numbers of messages while providing operations with the tools needed to maintain such a system, and also for task scheduling. Celery would not be a good choice if you just need a lightweight scheduling tool.

Celery is a powerful distributed task queue that allows tasks to be executed completely off the main application and even be assigned to run on other hosts. We usually use it to implement asynchronous tasks (async task) and timed tasks (crontab). Asynchronous tasks are time-consuming operations such as sending emails, or uploading files, image processing, etc. Timed tasks are tasks that need to be executed at a specific time.

It should be noted that celery itself does not have task storage capabilities, and tasks must be stored when scheduling them, so you need to use celery with some tools that have storage and access capabilities, such as message queues, Redis cache, database, etc. The official recommendation is the message queue RabbitMQ, and sometimes Redis is also a good choice.

Its architecture consists of the following diagram.

The Celery architecture, which uses a typical producer-consumer model, consists of the following main components.

  • Celery Beat, the task scheduler. The Beat process reads the contents of the configuration file and periodically sends the tasks due for execution in the configuration to the task queue.
  • Producer: Tasks that need to be performed in the queue, usually queued by users, triggers or other operations, and then handed over to workers for processing. Those who call the API, function or decorator provided by Celery to generate tasks and give them to the task queue are task producers.
  • Broker, the message middleware, in this case refers to the task queue itself, Celery plays the role of producer and consumer, brokers are the place where producers and consumers store/get products (queue).
  • Celery Worker, the consumer that executes the task, takes the task from the queue and executes it. Usually, multiple consumers are run on multiple servers to improve execution efficiency.
  • Celery supports Redis, RabbitMQ, MongoDB, Django ORM, SQLAlchemy, etc. by default.

In practice, when a user initiates a request from the web front-end, we only need to drop the task to be processed into the task queue broker, and the idle worker will process the task, and the result will be temporarily stored in the background database backend. We can start multiple worker processes on one machine or multiple machines at the same time to achieve distributed parallel processing of tasks.

Implementing Timed Tasks with the Data Flow Tool Apache Airflow

Apache Airflow is an Airbnb open source data flow tool , is currently an Apache incubation project . In a very flexible way to support the ETL process of data , but also supports a very large number of plug-ins to complete functions such as HDFS monitoring , email notifications , etc. Airflow supports both stand-alone and distributed mode , support Master-Slave mode , support Mesos and other resource scheduling , have very good scalability . It is adopted by a large number of companies.

Airflow is developed in Python, and it uses DAGs (Directed Acyclic Graph) to express the tasks to be executed in a workflow, and the relationships and dependencies between tasks. For example, in the following workflow, the execution of task T1 is completed before T2 and T3 can start executing, and T4 can start executing after both T2 and T3 are executed.

Airflow provides various Operator implementations that can perform various tasks implementing.

  • BashOperator - Executes bash commands or scripts.
  • SSHOperator - Executes remote bash commands or scripts (same principle as paramiko module).
  • PythonOperator - Executes Python functions.
  • EmailOperator - Sends an email.
  • HTTPOperator - Sends an HTTP request.
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc., to perform SQL tasks.
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

In addition to the above Operators it is also easy to customize Operators to meet individual task requirements.

In some cases, we need to execute different tasks depending on the result of the execution, so that the workflow will branch. For example:

This requirement can be implemented using BranchPythonOperator.

Background of the creation of Airflow

Often, in a large system such as an operations and maintenance system, a data analysis system, or a testing system, we have a variety of dependency requirements. These include, but are not limited to.

  • Time dependency: The task needs to wait for a certain point in time to trigger.
  • External System Dependencies: Tasks depend on external systems that require an interface call to access them.
  • Inter-task dependencies: Task A needs to be started after Task B is completed, and the two tasks will affect each other.
  • Resource Environment Dependency: The task consumes a lot of resources or can only be executed on a specific machine.

Airflow’s core concept of DAG (Directed Acyclic Graph) - to represent workflows.

  • Airflow is a WMS, i.e., it treats tasks and their dependencies as code, regulates task execution according to those plans, and distributes the tasks to be executed among the actual work processes.
  • Airflow provides an excellent UI for displaying the status of currently active tasks and past tasks, and allows users to manually manage the execution and status of tasks.
  • Workflows in Airflow are collections of tasks with directional dependencies.
  • Each node in the DAG is a task, and the edges in the DAG represent dependencies between tasks (forced to be directed and acyclic, so there are no circular dependencies, leading to infinite execution loops).

Airflow Core Concepts

  • DAGs: Directed Acyclic Graph, which organizes all tasks that need to be run according to their dependencies, and describes the order of execution of all tasks.
  • Operators: can be simply understood as a class, describing what a task in the DAG is supposed to do. Among them, airflow has many built-in operators, such as BashOperator to execute a bash command, PythonOperator to call arbitrary Python functions, EmailOperator to send emails, HTTPOperator to send HTTP requests, and SqlOperator to execute SQL requests. SqlOperator is used to execute SQL commands, etc. Also, the user can customize the Operator, which provides great convenience to the user.
  • Tasks: A Task is an instance of an Operator, which is a node in DAGs.
  • Task Instance: The task instance has its own status in the web interface, including “running”, “success”, You can see the status of the task instance in the web interface, including “running”, “success”, “failed”, “skipped”, “up for retry”, etc.
  • Task Relationships: There can be dependencies between different Tasks in DAGs, such as Task1 » Task2, indicating that Task2 is dependent on Task2. By combining DAGs and Operators, users can create various complex workflows.

Airflow’s architecture

In a scalable production environment, Airflow contains the following components.

  • Metadatabase : This database stores information about the status of tasks.
  • Scheduler : The Scheduler is a process that uses DAG definitions combined with the status of tasks in the metadata to determine which tasks need to be executed and the priority of task execution. Schedulers are typically run as services.
  • Executor : An Executor is a message queue process that is bound to the scheduler and is used to determine the work process that actually executes each task schedule. There are different types of executors, each of which uses a class that specifies the work process to execute the task. For example, LocalExecutor uses a parallel process running on the same machine as the scheduler process to execute the task. Other executors like CeleryExecutor use work processes that exist in a separate cluster of work machines to execute tasks.
  • Workers : These are the processes that actually perform the task logic, as determined by the executor being used.

The specific implementation of the worker is specified by the executor in the configuration file. airflow supports multiple executors:

  • SequentialExecutor: Single-process sequential execution, usually only used for testing
  • LocalExecutor: local multi-process execution
  • CeleryExecutor: Distributed task scheduling using Celery
  • DaskExecutor: Distributed task scheduling using Dask
  • KubernetesExecutor: new in 1.10.0, creates temporary POD to execute each task

Production environments generally use CeleryExecutor and KubernetesExecutor.

The architecture of using CeleryExecutor is shown in the figure:

The architecture using KubernetesExecutor is shown in Figure: