2025-11-11 02:52:20 +09:00

138 lines
2.7 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# job-manage-by-prefect
OSS perfect pipeline
## perfectとは
Pythonで@flow/@taskを書くだけ。失敗時リトライ、依存関係、スケジュール、UIで実行履歴も見える。
* 軽量でローカルでもクラウドでも動く。
* コードで定義UIあり。
* 初期導入が簡単。
```py
from prefect import flow, task
@task(retries=3)
def extract(d): ...
@task
def transform(data): ...
@task
def load(df): ...
@flow
def etl_flow(date: str):
d = extract.submit(date)
t = transform.submit(d)
load.submit(t)
if __name__ == "__main__":
etl_flow("2025-11-10")
```
## 全体図
```plantuml
@startuml
skinparam monochrome true
skinparam componentStyle rectangle
title Prefect 全体構成Server・Worker・Work Pool・Flow・UI
actor User as U
node "Developer Machine" {
component "Flow code\n(例: etl_flow.py)" as CODE
component "CLI\nprefect deployment build" as CLI
}
node "Prefect Server (Orion)" as SERVER {
[API] as API
[Web UI] as UI
database "State DB\n(deployments, runs, logs)" as DB
}
queue "Work Pool\n(process-pool)" as POOL
node "Worker" as WORKER {
component "prefect worker start\n -p process-pool" as WK
}
U --> UI : 実行・監視
CLI --> API : Deployment登録\n(--cron, --timezone)
API --> DB : 定義/スケジュール保存
API --> POOL : スケジュール時刻に\nフロー実行リクエスト
WK --> API : プールを監視・取得
WK --> CODE : フローをインポートして実行
WK --> API : 状態/ログ送信
UI --> API : 実行履歴/ログ参照
@enduml
```
## 使い方
### 実行方法
**フローを直接実行する**
```sh
cd src
python flows/etl_flow.py
```
**デプロイ&スケジュール(例毎朝7時)**
```sh
cd src
prefect deployment build flows/etl_flow.py:etl_flow \
-n daily-etl -p process-pool --cron "0 7 * * *" -a
```
### 状況確認
```sh
prefect deployment ls # 登録済みデプロイ確認
prefect deployment run daily-etl # 手動実行
prefect work-pool ls # ワークプール確認
prefect server status # サーバー状態確認
```
---
## Develop
### venv
**仮想環境を作成する場合**
```sh
python -m venv venv
pip install -r requirements.txt
```
**サーバーを起動する**
```sh
prefect server start
```
* サーバの機能
* 管理・スケジュール・状態を記録
* REST API
* Prefect Orion UI
* ブラウザで http://127.0.0.1:4200
**ワーカーを起動する**
```sh
prefect work-pool create process-pool -t process
prefect worker start -p process-pool
```
* 実際にフローを動かす
* Worker(実行エンジン)
* (旧名)Prefect Agent
* 実行環境: local / docker / k8sなど