From 5aa426f8f6b9bdf58aa7477e3f61cadeadfe7514 Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Tue, 11 Nov 2025 02:57:00 +0900 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=9C=9F=E6=A7=8B=E7=AF=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yaml | 36 ++++++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ src/flows/etl_flow.py | 19 +++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 docker-compose.yaml create mode 100644 requirements.txt create mode 100644 src/flows/etl_flow.py diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..b24cd97 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,36 @@ +# prefect-template/docker-compose.yml +services: + server: + image: prefecthq/prefect:2-latest + container_name: prefect-server + command: ["prefect","server","start","--host","0.0.0.0"] + ports: ["4200:4200"] # UI: http://localhost:4200 + environment: + PREFECT_UI_URL: "http://localhost:4200" + PREFECT_API_URL: "http://server:4200/api" + TZ: "Asia/Tokyo" + # Slack通知を使う場合、.env で SLACK_WEBHOOK_URL を設定 + SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} + volumes: + - ./src/flows:/opt/flows + - prefect-data:/root/.prefect + + worker: + image: prefecthq/prefect:2-latest + container_name: prefect-worker + depends_on: [server] + environment: + PREFECT_API_URL: "http://server:4200/api" + TZ: "Asia/Tokyo" + SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} + volumes: + - ./src/flows:/opt/flows + command: > + bash -lc " + pip install -r /opt/flows/requirements.txt >/dev/null 2>&1 || true && + prefect work-pool create process-pool -t process || true && + prefect worker start -p process-pool + " + +volumes: + prefect-data: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4268304 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +prefect \ No newline at end of file diff --git a/src/flows/etl_flow.py b/src/flows/etl_flow.py new file mode 100644 index 0000000..fed1c26 --- /dev/null +++ b/src/flows/etl_flow.py @@ -0,0 +1,19 @@ +from prefect import flow, task +from datetime import date + +@task(retries=3, retry_delay_seconds=10) +def extract(d): return f"raw({d})" + +@task +def transform(raw): return raw.replace("raw","clean") + +@task +def load(clean): print(f"loaded: {clean}") + +@flow +def etl_flow(d: str | None = None): + d = d or date.today().isoformat() + load(transform(extract(d))) + +if __name__ == "__main__": + etl_flow() \ No newline at end of file