1. 新建 airflow 用户

  • 新建用户 airflow,并设置密码
useradd airflow
passwd --stdin airflow
  • 给 airflow 增加 sudo 权限
# 编辑 sudo 权限文件 
vi /etc/sudoers

airflow    ALL=(ALL)       ALL

# 最后通过 wq! 强制保存
  • 登录 airflow 用户,后续 airflow 安装开发全部使用该用户

2.安装 airflow

2.1 环境配置

# 创建 airflow 目录
mkdir airflow
# 创建 python3 虚拟环境
virtualenv -p /usr/bin/python3 airflow/venv

# 配置环境变量
vi .bash_profile

# 增加下面两行
AIRFLOW_HOME=~/airflow
alias work="source ~/airflow/venv/bin/activate"

# 保存、生效并启动虚拟环境
source .bash_profile && work

2.2 安装

# 直接通过 pip 安装
pip install apache-airflow[all]
# 运行 airflow 命令,会在 AIRFLOW_HOME 自动生成相关配置文件
airflow version
  • 安装过程可能会报错,根据报错内容,安装相关的东西即可,参考下方
报错 处理
致命错误:sasl.h:没有那个文件或目录 sudo yum install cyrus-sasl-devel.x86_64 -y
sudo yum install libgsasl-devel.x86_64 -y
sudo yum install saslwrapper-devel.x86_64 -y
致命错误:sql.h:没有那个文件或目录 sudo yum install unixODBC-devel
致命错误:lber.h:没有那个文件或目录 sudo yum install openldap-devel -y
sqlite 版本过低 error: cannot use sqlite version < 3.15.0 安装高版本sqlite https://www.jianshu.com/p/4586aa8e5524

2.3 修改配置

  • 修改配置文件 cd $AIRFLOW_HOME && vi airflow.cfg
# 配置数据库,这里使用了 mysql
executor = LocalExecutor
sql_alchemy_conn =  mysql://root:root@172.16.122.25:3306/airflow?charset=utf8

# 设置时区
default_timezone = Asia/Shanghai

# web ui 界面使用的时区
default_ui_timezone=Asia/Shanghai

# 是否加载案例demo
load_examples = False
  • 重置数据库,可能会遇到 ImportError: cannot import name '_ColumnEntity' ,版本问题,SQLAlchemy 降级即可,pip install SQLAlchemy==1.3.23

需要先创建数据库 create database airflow

airflow db init        
  • 创建管理员用户
airflow users create \
   --username admin \
   --firstname admin \
   --lastname admin \
   --role Admin \
   --email xx@xx.com
   --password admin
  • 启动 airflow , 访问 http://172.16.122.13:8080/ ,admin/admin
# -h 可以查看更多参数
# 后台启动调度
airflow scheduler -D --stderr /dev/null --stdout /dev/null
# 后台启动 web ui
airflow webserver -D --stderr /dev/null --stdout /dev/null

image-20210412154543073

3.开发 DAG

3.1 创建文件夹 dags

# airflow 会扫描 dags 目录下的文件,所有开发的作业都放这边
mkdir dags && cd dags

3.2 编写 dag

编写文件 test.py, 上传到 dags 文件夹,运行 python test.py

# -*- coding: utf-8 -*-
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import timedelta, datetime

SSH_GETWAY = "ssh_20"
BIZDATE = """{{ params.get('bizdate', next_ds_nodash) }}"""

# -------------------------------------------------------------------------------
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# dag
JOB_DAG = DAG(
    dag_id="test",
    description="这是一个测试 DAG",
    start_date=datetime(2021, 4, 10),
    schedule_interval='0 */12 * * *',
    default_args=default_args,
    catchup=False
)

############################################## task ################################################################
get_hostname = SSHOperator(
    ssh_conn_id=SSH_GETWAY,
    task_id='get_hostname',
    command='hostname',
    dag=JOB_DAG
)

get_bizdate = SSHOperator(
    ssh_conn_id=SSH_GETWAY,
    task_id='get_bizdate',
    command=f'echo {BIZDATE}',
    dag=JOB_DAG
)

get_pwd = SSHOperator(
    ssh_conn_id=SSH_GETWAY,
    task_id='get_pwd',
    command='source .bash_profile; pwd',
    dag=JOB_DAG
)

############################################## dependence  ###########################################################

get_hostname >> [get_bizdate, get_pwd]

3.3 web ui 新增连接

image-20210412162536891

image-20210412162617868

image-20210412162653494

3.4 启用 dag 运行

image-20210412162847892

3.5 命令传参触发调度

airflow dags trigger test -c '{"bizdate":"19911106"}' 

image-20210412163502747

版权声明:如无特殊说明,文章均为本站原创,转载请注明出处

本文链接:http://blog.turboway.top/article/airflow/

许可协议:署名-非商业性使用 4.0 国际许可协议