diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..b24cd97 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,36 @@ +# prefect-template/docker-compose.yml +services: + server: + image: prefecthq/prefect:2-latest + container_name: prefect-server + command: ["prefect","server","start","--host","0.0.0.0"] + ports: ["4200:4200"] # UI: http://localhost:4200 + environment: + PREFECT_UI_URL: "http://localhost:4200" + PREFECT_API_URL: "http://server:4200/api" + TZ: "Asia/Tokyo" + # Slack通知を使う場合、.env で SLACK_WEBHOOK_URL を設定 + SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} + volumes: + - ./src/flows:/opt/flows + - prefect-data:/root/.prefect + + worker: + image: prefecthq/prefect:2-latest + container_name: prefect-worker + depends_on: [server] + environment: + PREFECT_API_URL: "http://server:4200/api" + TZ: "Asia/Tokyo" + SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} + volumes: + - ./src/flows:/opt/flows + command: > + bash -lc " + pip install -r /opt/flows/requirements.txt >/dev/null 2>&1 || true && + prefect work-pool create process-pool -t process || true && + prefect worker start -p process-pool + " + +volumes: + prefect-data: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4268304 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +prefect \ No newline at end of file diff --git a/src/flows/etl_flow.py b/src/flows/etl_flow.py new file mode 100644 index 0000000..fed1c26 --- /dev/null +++ b/src/flows/etl_flow.py @@ -0,0 +1,19 @@ +from prefect import flow, task +from datetime import date + +@task(retries=3, retry_delay_seconds=10) +def extract(d): return f"raw({d})" + +@task +def transform(raw): return raw.replace("raw","clean") + +@task +def load(clean): print(f"loaded: {clean}") + +@flow +def etl_flow(d: str | None = None): + d = d or date.today().isoformat() + load(transform(extract(d))) + +if __name__ == "__main__": + etl_flow() \ No newline at end of file