Airflow
官方网站
一、简介
airflow是一款开源的,分布式任务调度框架,它将一个具有上下级依赖关系的工作流,组装成一个有向无环图
二、特性
分布式任务调度:允许一个工作流的task在多台worker上同时执行
可构建任务依赖:以有向无环图的方式构建任务依赖关系
task原子性:工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务
自动发现循环
三、配置
1、安装airflow
1 2 3 4 pip install apache-airflow vi home/{user_name}/airflow/airflow.cfg
2、安装MYSQL(可选)
linkA
linkB
linkC
1 2 3 mysql> grant all on * .* to 'root' @'%' identified by '123456a' ; mysql> flush privileges; mysql> create database airflow ;
PS:
配置SQL为默认数据库可能出现的问题:
登录问题:https://blog.csdn.net/jlu16/article/details/82809937
键的问题:https://www.bbsmax.com/A/Vx5M2wlmdN/
时间戳的问题:在my.cnf 里设置
编码问题(未解决):DatabaseError: (1071, ‘Specified key was too long; max key length is 767 bytes’)
3、安装Redis(可选)
https://www.runoob.com/redis/redis-conf.html
6379默认端口
https://www.zhihu.com/question/20084750
4、配置airflow(可选)
sql_alchemy_conn = mysql://airflow:123456@127.0.0.1:3306/airflow
1 2 3 4 5 6 7 8 airflow db init airflow users create --username admin --firstname admin --lastname admin --role Admin --email example@XX airflow webserver -p 8080 airflow scheduler
四、使用全流程
(一)以配置方式书写DAG
1、设置DAG的默认参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 default_args = { 'owner' : 'airflow' , 'depends_on_past' : False , 'email' : ['airflow@example.com' ], 'email_on_failure' : False , 'email_on_retry' : False , 'retries' : 1 , 'retry_delay' : timedelta(minutes=5 ), }
2、声明一个DAG
1 2 3 4 5 6 7 8 9 10 11 with DAG( 'tutorial' , default_args=default_args, description='A simple tutorial DAG' , schedule_interval=timedelta(days=1 ), start_date=days_ago(2 ), tags=['example' ], ) as dag:
3、声明一个TASK
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 t1 = BashOperator( task_id='print_date' , bash_command='date' , ) t2 = BashOperator( task_id='sleep' , depends_on_past=False , bash_command='sleep 5' , retries=3 , ) templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ ) t3 = BashOperator( task_id='templated' , depends_on_past=False , bash_command=templated_command, params={'my_param' : 'Parameter I passed in' }, )
4、设置任务之间的依赖关系
1 2 3 4 t1 >> [t2, t3] t1.set_downstream([t2, t3])
(二)以装饰器和函数方式书写DAG
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import jsonfrom airflow.decorators import dag, taskfrom airflow.utils.dates import days_agodefault_args = { 'owner' : 'airflow' , } @dag(default_args=default_args, schedule_interval=None , start_date=days_ago(2 ), tags=['example' ] ) def tutorial_taskflow_api_etl (): @task() def extract (): data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict @task(multiple_outputs=True ) def transform (order_data_dict: dict ): total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value" : total_order_value} @task() def load (total_order_value: float ): print (f"Total order value is: {total_order_value:.2 f} " ) order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value" ]) tutorial_etl_dag = tutorial_taskflow_api_etl()
五、其他技术
1、注意
1)airflow的DAG存储在数据库中,设定新的DAG时,需要重新初始化airflow的默认数据库
2)apache-airflow本身没有提供删除dag的接口,只能通过数据库层面删除
2、定义任务中的Operator
airflow除了BashOperator之外还提供了以下的操作器模版
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
EmailOperator - sends an email
同时提供了额外安装其他操作器的方法(不过一般以上三个就足够了)
SimpleHttpOperator
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
S3FileTransformOperator
PrestoToMySqlOperator
SlackAPIOperator
3、给任务指定运行时间
1 2 3 4 airflow tasks test tutorial print_date 2015-06-01 airflow tasks test tutorial sleep 2015-06-01
4、如何处理在不同worker和不同node的任务
使用@task来装饰,然后交给airflow处理
Tutorial on the Taskflow API
5、实现DAG周期执行
DAG Runs
6、更改执行器以适合集群运行
部署一个健壮的 apache-airflow 调度系统
配置celery的后端地址(全部为master(即调度程序)的ip)
airflow提供了多种执行器以适应多节点的DAG工作
单一机器的时候使用Local Executors,多机器的时候使用Remote Executors中的一种
Local Executors
Debug Executor
Local Executor
Sequential Executor
Remote Executors
Celery Executor
CeleryKubernetes Executor
Dask Executor
Kubernetes Executor
参考资料
airflow 实战总结
如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !