Airflow Conpect

上一篇:etl bigdata
更多信息查看:https://blue-shadow.top

Airflow平台是一个用于描述、执行、监控工作流的工具。通过编写Python脚本,支持不同类型的任务。

主要概念&核心构架

对Airflow中最核心的概念和构架思想进行说明,并在最后结合对应API进行演示操作。

DAG-有向无环图

DAG-有向无环图,是要运行的所有Tasks的集合,组织方式反映了Task的关系和依赖。DAG通过Python脚本编写,以代码定义了DAG的结构,包括具体的任务和任务间的依赖关系。
DAG描述的是期望如何执行工作流程,但并不关心任务具体要做了什么,DAG的任务就是确保它们所做的任何事情在正确的时间、以正确的顺序、或以正确的方式处理各种类型的任务。
DAG通过Python文件定义,存在在定义的DAG_FOLDER文件夹下,执行每个文件中的代码来动态构建DAG对象。

DAG的配置参数,配置参数字典被传递到DAG,将把它们应用任何操作符上。这使得可以很容易地将一个通用参数应用于多个操作符,而不必多次键入它。
以下是常用的配置产生。

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

创建还是用DAG的基本代码。

# 1-导入必要的库
from airflow import DAG
from airflow.utils.dates import days_ago

# 2-设置DAG参数
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
} 

# 3-创建DAG
dag = DAG(
    'test',
    default_args=default_args,
    description='simple test DAG',
    schedule_interval=timedelta(days=1),
)

Operator

DAG描述如何运行一个工作流,实际完成何种操作由Operator来定义。Operator描述工作流中的单个任务,可以独立运行,不需要与其他Operator共享资源。
DAG将确保Operator以正确的顺序运行,除了这些依赖关系,Operator通常独立运行。如果两个Operator需要共享信息,比如文件名或少量数据,
可以考虑将它们合并到一个操作符中,或使用XComs。

在Airflow中,提供了必要的常用Operator,如下所示。

Operator 功能
BashOperator 执行Bash命令
PythonOperator 执行Python函数
EmailOperator 用于发送邮件
SimpleHttpOperator 用于发送HTTP请求
DockerOperator Docker操作相关
HiveOperator Hive操作
MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator等 用于执行SQL

Operator只有在被分配到DAG时才会被加载,所以Operator的创建方法和分配的代码如下。

# 导入必要的库
from airflow import DAG
from airflow.operators.bash_operator import *

dag = DAG('my_dag', start_date=datetime(2020, 1, 1))

# 立即分配到dag
explicit_op = DummyOperator(task_id='op1', dag=dag)

# 推迟分配到dag
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

Relationship

前面讲述的DAG、Operator相关的概念。并通过代码简单演示了创建方法、分配方式。对于Operator间的执行顺序、依赖关系在这小节说明。
通过使用set_upstream()和set_downstream()函数,或使用 >>、<<操作符。


from airflow import DAG
from airflow.operators.bash_operator import *

dag = DAG('my_dag', start_date=datetime(2020, 1, 1))
op1 = DummyOperator(task_id='op1', dag=dag)
op2 = DummyOperator(task_id='op2', dag=dag)

# 1-任务op1在任务op2之前执行,使用 >> 操作符和 set_downstream 函数 
op1 >> op2
op1.set_downstream(op2)

# 2-任务op2在任务op1之后执行,使用 << 操作符和 set_upstream 函数 
op2 << op1
op2.set_upstream(op1)

# 3-在关系链中直接使用DAG
dag >> op1 >> op2
#等价于
op1.dag=dag
op1.set_downstream(op2)

# 4-针对任务列表的使用
op3 = DummyOperator(task_id='op3', dag=dag)
op4 = DummyOperator(task_id='op4', dag=dag)
op1 >> [op3,op4] >> op2
#等价于
op1 >> op3 >> op2
op1 >> op4 >> op2

# 5-可以使用chain、cross_downstream 在特定情况下更容易设置操作符之间关系的方法
[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6
# 等价于
cross_downstream([op1, op2, op3], [op4, op5, op6])


op1 >> op2 >> op3 >> op4 >> op5
#等价于
chain(op1, op2, op3, op4, op5)

Tasks

一旦Operator被实例化,它就被称为-任务。参数化任务成为DAG中的一个节点。任务实例表示任务的特定运行,并被描述为DAG、任务和时间点的组合。任务实例还具有指示性状态,可以是运行、成功、失败、跳过、重试等。

任务的完整生命周期如下所示:
No Status: 调度器创建了空任务实例
Scheduled: 调度程序确定的任务实例需要运行
Queued: 调度器将任务发送给executor以在队列上运行
Running: worker拾取任务并正在运行它
Success: 任务完成

Task生命周期
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352