# job-manage-by-prefect OSS perfect pipeline ## perfectとは? Pythonで@flow/@taskを書くだけ。失敗時リトライ、依存関係、スケジュール、UIで実行履歴も見える。 * 軽量でローカルでもクラウドでも動く。 * コードで定義+UIあり。 * 初期導入が簡単。 ```py from prefect import flow, task @task(retries=3) def extract(d): ... @task def transform(data): ... @task def load(df): ... @flow def etl_flow(date: str): d = extract.submit(date) t = transform.submit(d) load.submit(t) if __name__ == "__main__": etl_flow("2025-11-10") ``` ## 全体図 ```plantuml @startuml skinparam monochrome true skinparam componentStyle rectangle title Prefect 全体構成(Server・Worker・Work Pool・Flow・UI) actor User as U node "Developer Machine" { component "Flow code\n(例: etl_flow.py)" as CODE component "CLI\nprefect deployment build" as CLI } node "Prefect Server (Orion)" as SERVER { [API] as API [Web UI] as UI database "State DB\n(deployments, runs, logs)" as DB } queue "Work Pool\n(process-pool)" as POOL node "Worker" as WORKER { component "prefect worker start\n -p process-pool" as WK } U --> UI : 実行・監視 CLI --> API : Deployment登録\n(--cron, --timezone) API --> DB : 定義/スケジュール保存 API --> POOL : スケジュール時刻に\nフロー実行リクエスト WK --> API : プールを監視・取得 WK --> CODE : フローをインポートして実行 WK --> API : 状態/ログ送信 UI --> API : 実行履歴/ログ参照 @enduml ``` ## 使い方 ### 実行方法 **フローを直接実行する** ```sh cd src python flows/etl_flow.py ``` **デプロイ&スケジュール(例:毎朝7時)** ```sh cd src prefect deployment build flows/etl_flow.py:etl_flow \ -n daily-etl -p process-pool --cron "0 7 * * *" -a ``` ### 状況確認 ```sh prefect deployment ls # 登録済みデプロイ確認 prefect deployment run daily-etl # 手動実行 prefect work-pool ls # ワークプール確認 prefect server status # サーバー状態確認 ``` --- ## Develop ### venv **仮想環境を作成する場合** ```sh python -m venv venv pip install -r requirements.txt ``` **サーバーを起動する** ```sh prefect server start ``` * サーバの機能 * 管理・スケジュール・状態を記録 * REST API * Prefect Orion UI * ブラウザで http://127.0.0.1:4200 **ワーカーを起動する** ```sh prefect work-pool create process-pool -t process prefect worker start -p process-pool ``` * 実際にフローを動かす * Worker(実行エンジン) * (旧名)Prefect Agent * 実行環境: local / docker / k8sなど