Airflow

Task schedule tool.

Posted by Mr. Liu on 2022-05-23
Estimated Reading Time 6 Minutes
Words 1.3k In Total
Viewed Times

Airflow

官方网站

一、简介

airflow是一款开源的,分布式任务调度框架,它将一个具有上下级依赖关系的工作流,组装成一个有向无环图

二、特性

  • 分布式任务调度:允许一个工作流的task在多台worker上同时执行
  • 可构建任务依赖:以有向无环图的方式构建任务依赖关系
  • task原子性:工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务
  • 自动发现循环

三、配置

1、安装airflow

1
2
3
4
#默认安装即可
pip install apache-airflow
#修改配置文件,主要是修改数据库为MYSQL(可选)(不用自己的数据库时,使用airflow自带的数据库)
vi home/{user_name}/airflow/airflow.cfg

2、安装MYSQL(可选)

linkA

linkB

linkC

1
2
3
mysql> grant all on *.* to 'root'@'%' identified by '123456a';
mysql> flush privileges;
mysql> create database airflow ;

PS:

配置SQL为默认数据库可能出现的问题:

登录问题:https://blog.csdn.net/jlu16/article/details/82809937

键的问题:https://www.bbsmax.com/A/Vx5M2wlmdN/

时间戳的问题:在my.cnf 里设置

编码问题(未解决):DatabaseError: (1071, ‘Specified key was too long; max key length is 767 bytes’)

3、安装Redis(可选)

https://www.runoob.com/redis/redis-conf.html

6379默认端口

https://www.zhihu.com/question/20084750

4、配置airflow(可选)

sql_alchemy_conn = mysql://airflow:123456@127.0.0.1:3306/airflow

1
2
3
4
5
6
7
8
#数据库初始化,默认的可以,如果配置成MYSQL有可能应为配置问题报错
airflow db init
#创建用户,输入密码123456
airflow users create --username admin --firstname admin --lastname admin --role Admin --email example@XX
#使用普通用户启动,提供网页端访问
airflow webserver -p 8080
#启动调度程序
airflow scheduler

四、使用全流程

(一)以配置方式书写DAG

1、设置DAG的默认参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}

2、声明一个DAG

1
2
3
4
5
6
7
8
9
10
11
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
#task1
#task2
#task3

3、声明一个TASK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5', #在shell上执行的指令
retries=3, #失败重启次数
)
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)

4、设置任务之间的依赖关系

1
2
3
4
#t1为t2和t3的上游
t1 >> [t2, t3]
#or
t1.set_downstream([t2, t3])

(二)以装饰器和函数方式书写DAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import json
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}
#1、声明这是一个DAG并配置DAG参数
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def tutorial_taskflow_api_etl():

#2、声明这是一个任务,并书写任务函数
@task()
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict


@task(multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}


@task()
def load(total_order_value: float):
print(f"Total order value is: {total_order_value:.2f}")

#3、设置人物之间的顺序(依赖关系)
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])


tutorial_etl_dag = tutorial_taskflow_api_etl()

五、其他技术

1、注意

1)airflow的DAG存储在数据库中,设定新的DAG时,需要重新初始化airflow的默认数据库

2)apache-airflow本身没有提供删除dag的接口,只能通过数据库层面删除

2、定义任务中的Operator

airflow除了BashOperator之外还提供了以下的操作器模版

  • BashOperator - executes a bash command
  • PythonOperator - calls an arbitrary Python function
  • EmailOperator - sends an email

同时提供了额外安装其他操作器的方法(不过一般以上三个就足够了)

  • SimpleHttpOperator
  • MySqlOperator
  • PostgresOperator
  • MsSqlOperator
  • OracleOperator
  • JdbcOperator
  • DockerOperator
  • HiveOperator
  • S3FileTransformOperator
  • PrestoToMySqlOperator
  • SlackAPIOperator

3、给任务指定运行时间

1
2
3
4
# testing print_date  最后的日期为运行时间
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01

4、如何处理在不同worker和不同node的任务

使用@task来装饰,然后交给airflow处理

Tutorial on the Taskflow API

5、实现DAG周期执行

DAG Runs

6、更改执行器以适合集群运行

部署一个健壮的 apache-airflow 调度系统

配置celery的后端地址(全部为master(即调度程序)的ip)

airflow提供了多种执行器以适应多节点的DAG工作

单一机器的时候使用Local Executors,多机器的时候使用Remote Executors中的一种

  • Local Executors
    • Debug Executor
    • Local Executor
    • Sequential Executor
  • Remote Executors
    • Celery Executor
    • CeleryKubernetes Executor
    • Dask Executor
    • Kubernetes Executor

参考资料

airflow 实战总结


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !