1. 新建 airflow 用户
- 新建用户 airflow,并设置密码
useradd airflow passwd --stdin airflow
- 给 airflow 增加 sudo 权限
# 编辑 sudo 权限文件 vi /etc/sudoers airflow ALL=(ALL) ALL # 最后通过 wq! 强制保存
- 登录 airflow 用户,后续 airflow 安装开发全部使用该用户
2.安装 airflow
2.1 环境配置
# 创建 airflow 目录 mkdir airflow # 创建 python3 虚拟环境 virtualenv -p /usr/bin/python3 airflow/venv # 配置环境变量 vi .bash_profile # 增加下面两行 AIRFLOW_HOME=~/airflow alias work="source ~/airflow/venv/bin/activate" # 保存、生效并启动虚拟环境 source .bash_profile && work
2.2 安装
# 直接通过 pip 安装 pip install apache-airflow[all] # 运行 airflow 命令,会在 AIRFLOW_HOME 自动生成相关配置文件 airflow version
- 安装过程可能会报错,根据报错内容,安装相关的东西即可,参考下方
报错 | 处理 |
---|---|
致命错误:sasl.h:没有那个文件或目录 | sudo yum install cyrus-sasl-devel.x86_64 -y sudo yum install libgsasl-devel.x86_64 -y sudo yum install saslwrapper-devel.x86_64 -y |
致命错误:sql.h:没有那个文件或目录 | sudo yum install unixODBC-devel |
致命错误:lber.h:没有那个文件或目录 | sudo yum install openldap-devel -y |
sqlite 版本过低 error: cannot use sqlite version < 3.15.0 |
安装高版本sqlite https://www.jianshu.com/p/4586aa8e5524 |
2.3 修改配置
- 修改配置文件
cd $AIRFLOW_HOME && vi airflow.cfg
# 配置数据库,这里使用了 mysql executor = LocalExecutor sql_alchemy_conn = mysql://root:root@172.16.122.25:3306/airflow?charset=utf8 # 设置时区 default_timezone = Asia/Shanghai # web ui 界面使用的时区 default_ui_timezone=Asia/Shanghai # 是否加载案例demo load_examples = False
- 重置数据库,可能会遇到 ImportError: cannot import name '_ColumnEntity' ,版本问题,SQLAlchemy 降级即可,
pip install SQLAlchemy==1.3.23
需要先创建数据库
create database airflow
airflow db init
- 创建管理员用户
airflow users create \ --username admin \ --firstname admin \ --lastname admin \ --role Admin \ --email xx@xx.com --password admin
- 启动 airflow , 访问 http://172.16.122.13:8080/ ,admin/admin
# -h 可以查看更多参数 # 后台启动调度 airflow scheduler -D --stderr /dev/null --stdout /dev/null # 后台启动 web ui airflow webserver -D --stderr /dev/null --stdout /dev/null
3.开发 DAG
3.1 创建文件夹 dags
# airflow 会扫描 dags 目录下的文件,所有开发的作业都放这边 mkdir dags && cd dags
3.2 编写 dag
编写文件 test.py, 上传到 dags 文件夹,运行 python test.py
# -*- coding: utf-8 -*- from airflow import DAG from airflow.contrib.operators.ssh_operator import SSHOperator from datetime import timedelta, datetime SSH_GETWAY = "ssh_20" BIZDATE = """{{ params.get('bizdate', next_ds_nodash) }}""" # ------------------------------------------------------------------------------- default_args = { 'owner': 'airflow', 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } # dag JOB_DAG = DAG( dag_id="test", description="这是一个测试 DAG", start_date=datetime(2021, 4, 10), schedule_interval='0 */12 * * *', default_args=default_args, catchup=False ) ############################################## task ################################################################ get_hostname = SSHOperator( ssh_conn_id=SSH_GETWAY, task_id='get_hostname', command='hostname', dag=JOB_DAG ) get_bizdate = SSHOperator( ssh_conn_id=SSH_GETWAY, task_id='get_bizdate', command=f'echo {BIZDATE}', dag=JOB_DAG ) get_pwd = SSHOperator( ssh_conn_id=SSH_GETWAY, task_id='get_pwd', command='source .bash_profile; pwd', dag=JOB_DAG ) ############################################## dependence ########################################################### get_hostname >> [get_bizdate, get_pwd]
3.3 web ui 新增连接
3.4 启用 dag 运行
3.5 命令传参触发调度
airflow dags trigger test -c '{"bizdate":"19911106"}'
版权声明:如无特殊说明,文章均为本站原创,转载请注明出处
本文链接:http://blog.turboway.top/article/airflow/
许可协议:署名-非商业性使用 4.0 国际许可协议