遷移工具 Air2phin 宣佈開源,2 步遷移 Airflow 至 Dolphinscheduler

近日,排程系統遷移工具 Air2phin 宣佈開源。藉助 Air2phin,使用者可 2 步將排程系統從 Airflow 遷移至 Apache DolphinScheduler,為有排程系統遷移需要的使用者帶來極大便利。

Air2phin 是什麼?

Air2phin 是什麼?

Air2phin 是一個最近宣佈開源的排程系統遷移工具,旨在將 Apache Airflow DAGs 檔案轉換成 Apache DolphinScheduler Python SDK 定義檔案,從而實現使用者將排程系統(Workflow orchestration)從 Airflow 遷移到 DolphinScheduler 的目的。它是一個基於多規則的 AST 轉換器,使用 LibCST 來解析和轉換 Airflow 的 DAG 程式碼,其全部規則使用 Yaml 檔案定義,並提供了一定的自定義規則擴展能力。

近期,Air2phin 已經發布了0.0.12 版本,提供了豐富的功能,可以更好地幫助使用者完成 Airflow 到 Apache DolphinScheduler 的遷移。

AST 是 Abstract Syntax Tree(抽象語法樹)的縮寫,它是一種以樹狀結構表示程式碼語法結構的資料結構。在編譯器中,AST 是由詞法分析器和語法分析器生成的。詞法分析器將源程式碼轉換成標記流(token stream),語法分析器將標記流轉換成抽象語法樹。AST 是一種樹狀結構,它由一系列節點組成,每個節點表示程式碼中的一個語法結構(如表達式、語句、函數、類等),節點之間的關係表示語法結構之間的嵌套關係。

為什麼開源 Air2phin?

為什麼開源 Air2phin?

可能有人會問,為什麼我需要一個遷移工具?這是因為隨著業務的發展,企業或組織原來使用的工作流編排系統已經無法滿足當前的需求,需要將工作流編排系統遷移到新的平臺或者更新到新的版本。經過調研,很多使用者有了將排程系統從開源工作流編排系統 Airflow 遷移到 Apache DolphinScheduler 上來的需求。

在遷移過程中,由於資料處理任務可能涉及多個系統之間的依賴關係,遷移過程需要確保在不影響業務運行的前提下完成。此時,排程系統遷移工具就可以發揮重要作用,它能減少人工干預,儘量自動化地完成兩個排程系統間的遷移工作,並且能兼容多個系統間的多個版本,幾乎可以做到使用者無干預完成遷移。

為此,白鯨開源專門研發了開源遷移工具 Air2phin,可以讓使用者 2 步將排程系統從 Airflow 遷移至 Apache DolphinScheduler,為使用者帶來極大的便利。

為了讓大家更好地理解 Air2phin 的重要性,我們先從排程系統的相關背景知識開始,了解將排程系統從 Airflow 遷移至 Apache DolphinScheduler 的好處。

為什麼要從 Airflow 遷移至 DolphinScheduler?

什麼是工作流編排系統?

工作流編排系統,是以尊重編排規則和業務邏輯的方式管理資料流。工作流編排工具讓使用者可以將多個有關聯的任務轉換為可以安排、運行和觀測的工作流,幫助企業更好地管理和控制業務流程,從而提高業務效率。工作流編排是資料處理流程中不可或缺的元件之一,負責根據預先定義的規則和邏輯執行資料處理任務,確保資料處理流程按照預期順利執行,常見工作流編排系統包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。

Airflow是什麼?

其中,Apache Airflow 是一個開源的工作流編排系統,它可以幫助使用者創建、排程和監控複雜的工作流程。Airflow 最初由 Airbnb 開發,並於 2016 年開源,現在由 Apache 軟體基金會維護。Airflow 使用 Python 語言編寫,具有高度的可擴展性和靈活性,支持多種任務類型,如計算、資料處理、通知、互動等。Airflow 的工作流程是通過編寫 Python 腳本來定義的,可以使用 Airflow 提供的運運算元和鉤子,以及自定義運運算元和鉤子來擴展其功能。但其有著不可忽視的缺陷,比如需要需要深度二次開發,脫離社區版本,升級成本高;Python 技術棧維護迭代成本高;scheduler loop 掃描 Dag folder 延遲降低性能的問題;以及在生產環境中使用穩定性差等。

在新資料時代業務需求下誕生的 Apache DolphinScheduler 是一個開源的分散式工作流排程系統,彌補了以往排程系統的弱勢,旨在為企業使用者提供一種可靠、高效、易於使用的工作流排程平臺,支持多種任務類型,如計算、資料處理、ETL 等。

與 Airflow 相比,DolphinScheduler 採用了分散式架構,提供了多種任務類型,使用者可以定義任務之間的依賴關係,設置任務的優先級和排程策略等,其使用視覺化的界面來創建和管理工作流程的特性更是與 Airflow 形成鮮明對比,變得更加易於操作,對非程式設計人員來說更加友好。

經過調研對比,對於很多使用者來說,將排程系統遷移至 Apache DolphinScheduler 是一個降本增效的更優選擇。

Air2phin 如何安裝和使用

Air2phin 如何安裝和使用

Air2phin 是一個 python 的包,可以通過 Python 的包安裝工具 pip 完成安裝,詳見air2phin getting start。

python -m pip install --upgrade air2phin

一個簡單的例子

我們通過一個簡單的例子,來說明如何使用 Air2phin 的。我們擷取了 airflow tutorial.py 中的部分程式碼作為 Air2phin 轉化的例子,來說明 Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk。

圖1:airflow tutorial.py 中的部分程式碼

圖2:Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk

假設將 airflow tutorial.py 部分內容保存至檔案 tutorial_part.py,想要將其轉化成 dolphinscheduler python sdk 定義,只需要一行命令就能完成。結果如圖 2 所示,因為命令增加了 –inplace 參數,所以 Air2phin 會直接將原檔案覆蓋,如果不需要覆蓋原問題,可以不使用 –inplace 參數,Air2phin 會新增一個 tutorial_part-air2phin.py 檔案來保存轉化後的內容。

air2phin migrate --inplace tutorial_part.py

通過觀察,我們發現這次轉化分別觸發了多條轉化規則,包括

  • 將 airflow.DAG 轉換成

    pydolphinscheduler.core.process_definition.ProcessDefinition,這個規則在第三行(import語句)以及第六行 DAG context

  • 將 airflow.operators.bash.BashOperator 轉換成

    pydolphinscheduler.tasks.shell.Shell,這個規則在任務 t1,t2 中都被使用

  • 除了對應的類轉化之外,我們需要將類的屬性進行轉化,如將

    airflow.DAG.schedule_interval 轉換成了 ProcessDefinition.schedule,同時修改了部分值的內容,如將 timedelta(days=1) 轉成 ‘0 0 0 * * ? *’

最後,我們只需要安裝 pydolphinscheduler ,並且將轉化後的檔案通過 python 運行,就能完成工作流的遷移了,詳見 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)。

# 安裝 apache-dolphinschedulerpython -m pip install apache-dolphinscheduler# 將工作流提交到 dolphinschedulerpython tutorial_part.py

在運行 python tutorial_part.py 時,需要保證 dolphinscheduler API 和 python gateway 服務已經啟動,並且開放了對應的埠,詳見啟動 python gateway service。

至此,我們通過一個簡單的例子,說明了 Air2phin 是如何完成遷移的。

工作原理

工作原理

Airflow 和 dolphinscheduler python sdk 如何工作?

在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置條件,幫助我們更好地了解 Air2phin 的遷移步驟,當遇到問題的時候也能更加從容地應對。

  • Airflow 如何工作:A

    irflow 工作流相關的資訊都保存在 DAG 檔案中,之後將 DAG 檔案放置到 Airflow 的指定目錄,Airflow 的 Scheduler 會間隔一定時間去掃描和解析 Airflow 的 DAG 檔案,所以 DAG 檔案是被動被掃描和更新的。

  • dolphinscheduler python sdk:

    同 Airflow 類似,將全部工作流相關的資訊都通過 Python 檔案定義,但是 dolphinscheduler python sdk 是通過人為主動觸發的方式,將工作流資訊提交,運行命令 python 工作流檔案名 即可完成主動任務提交。

Air2phin 工作流程

Air2phin 工作流程

了解完兩者是如何使用,如何提交/發現工作流的,將更加利於我們對 Air2phin 的工作原理的理解。因為 Airflow 的 DAG 檔案以及 DolphinScheduler 的 Python sdk 定義檔案都是 Python 編寫的,所以 Air2phin 的大部分程式碼都是處理兩者間的差異,最後將 Airflow 的程式碼轉化成 dolphinscheduler python sdk 和定義。

Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 來實現 airflow python DAG 程式碼的抽象語法樹解析,然後通過 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)結合轉化規則最後轉化成 dolphinscheduler python sdk 的定義。

Air2phin 整體工作流程如下

Air2phin 整體工作流程如下:

  • 從標準輸入或者檔案中獲取原本的 Airflow DAG 內容

  • 從 Yaml 檔案載入所有轉換規則

  • 將 Airflow DAG 內容通過 LibCST 解析成 CST 樹

  • 通過 LibCST Transformer 轉換 dolphinscheduler python sdk 定義內容

Air2phin 最佳實踐

Air2phin 最佳實踐

  • 遷移整個檔案夾而不是單個檔案

當使用者想要遷移 Airflow 到 DolphinScheduler 的時候,都是想要整體做遷移而不是單個檔案遷移的,Air2phin 提供整體檔案夾遷移的能力,只需要將路徑從檔案路徑改成檔案夾即可。

# 遷移整個 ~/airflow/dags 檔案夾air2phin migrate --inplace ~/airflow/dags
  • 增加自定義的規則

部分使用 Airflow 的使用者自定義 Hook 或者 Operator,使用者自定義的 Operator 無法通過 Air2phin 內建的轉化規則完成轉化,需要使用者增加自定義的規則,並告訴 Air2phin 規則的位置。例如我們有一個叫 MyCustomOperator 的運算元是繼承 PostgresOperator 的大部分功能, 只是命名不一樣,其定義如下:

from airflow.providers.postgres.operators.postgres import PostgresOperatorclass MyCustomOperator(PostgresOperator):def __init__(self,*,sql: str | Iterable[str],my_custom_conn_id: str = 'postgres_default',autocommit: bool = False,parameters: Iterable | Mapping | None = None,database: str | None = None,runtime_parameters: Mapping | None = None,**kwargs,) -> None:super().__init__(sql=sql,postgres_conn_id=my_custom_conn_id,autocommit=autocommit,parameters=parameters,database=database,runtime_parameters=runtime_parameters,**kwargs,)

它在 Airflow 的多個 DAG 中被使用,使用的方式如下:

from custom.my_custom_operator import MyCustomOperatorwith DAG(dag_id='my_custom_dag',default_args=default_args,schedule_interval='@once',start_date=days_ago(2),tags=['example'],) as dag:t1 = MyCustomOperator(task_id='my_custom_task',sql='select * from table',my_custom_conn_id='my_custom_conn_id',)

現在需要對這個 Operator 進行轉化,我們可以自定義一個轉化規則,並將其命名為 MyCustomOperator.yaml,內容如下,最主要的內容是 migration.module 和 migration.parameter 的定義,其確定了轉化規則:

name: MyCustomOperatordescription: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration:module:- action: replacesrc: custom.my_custom_operator.MyCustomOperatordest: pydolphinscheduler.tasks.sql.Sqlparameter:- action: replacesrc: task_iddest: name- action: replacesrc: my_custom_conn_iddest: datasource_name

再使用 –custom-rules 參數指定轉化自定義參數,就能應用自定義規則的轉化:

# 指定自定義規則路徑為 /path/to/MyCustomOperator.yamlair2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags
  • 讓 Air2phin 運行地更快

Air2phin 默認是一個進程運行 DAG 檔案的轉化的,當你有許多 DAG 檔案時,Air2phin 轉化非常耗時,我們提供了一個啟動多進程運行 Air2phin 轉化的參數 –multiprocess,可以將其指定為使用者機器的 CPU 數量來縮短轉化時間:

# 指定 air2phin 啟動 12 個進程同時進行轉化air2phin migrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12 ~/airflow/dags
存在的問題

存在的問題

目前,作為一個轉化工具,Air2phin 的使用方式已經算比較完善了,能夠滿足使用者遷移排程系統的基本需求,但還有一些地方有待完善。

內建規則還不夠多

轉化規則還不夠多,目前只有五個,分別是:

  • airflow.DAG

  • airflow.operators.bash.BashOperator

  • airflow.operators.dummy_operator.DummyOperator

  • airflow.operators.python_operator.PythonOperator

  • airflow.operators.spark_sql_operator.SparkSqlOperator

如果有更多的規則,Air2phin 將成為一個更加好用的轉化工具,這裡歡迎各位隨時提交轉化規則的PR(https://github.com/WhaleOps/air2phin/pulls)。

部分Airflow的用法不能被遷移過來

部分概念僅僅在 Airflow 中有,在 DolphinScheduler 中還沒有,如任務的成功、失敗、重試、觸發 callback,任務的 owner,variable,工作流併發數,tag 等,這部分 Airflow DAG 可以被遷移,但兼容的屬性將會丟失,無法遷移到 DolphinScheduler。

Air2phin 常見問題解答

Air2phin 常見問題解答

Q:為什麼選擇解析 Airflow DAG 檔案而不是資料庫?

A:因為 Airflow DAG 檔案中才有完成的工作流資訊,Airflow 的資料庫中只有工作流基本資訊,沒有任務定義的資訊,也沒有任務的關係,我們選擇通過解析 Airflow 的 DAG 檔案而不是資料庫來完成轉化。

Q:為什麼要通過 dolphinscheduler python sdk 做中轉不自己提交到 DolphinScheduler?

A:因為 Airflow DAG 就是 Python 定義的,在 Airflow DAG 中有很多 Python 的特性,我們不想將這部分特性轉化成結構化的資料(轉化可能存在資訊丟失),恰好 DolphinScheduler 已經有了 Python 的 sdk,所以直接通過 LibCST 轉化是成本更加低的做法。

Q:為什麼使用 LibCST 而不是 python 內建的 AST?

A:因為 LibCST 更加符合我們,Python 內建的 AST 庫解析成 AST 的時候會丟失掉 comment 的資訊,但是我們呢希望保留著部分資訊。且 LibCST 提供更加多 visitor 保證我們更加方便的實現替換。

參考連結:air2phin(https://github.com/WhaleOps/air2phin)

相關文章

CNNVD通報Oracle多個安全漏洞

CNNVD通報Oracle多個安全漏洞

近日,CNNVD通報Oracle多個安全漏洞,其中Oracle產品本身漏洞60個,影響到Oracle產品的其他廠商漏洞247個。包括Orac...