ニュースを取得するAPIを整備

This commit is contained in:
ry.yamafuji 2025-11-13 19:47:22 +09:00
parent 5aa426f8f6
commit a390658907
19 changed files with 1072 additions and 39 deletions

6
.gitignore vendored
View File

@ -1,3 +1,7 @@
data
*service_accout.json
*sa.json
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
@ -15,8 +19,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/

View File

@ -1,36 +0,0 @@
# prefect-template/docker-compose.yml
services:
server:
image: prefecthq/prefect:2-latest
container_name: prefect-server
command: ["prefect","server","start","--host","0.0.0.0"]
ports: ["4200:4200"] # UI: http://localhost:4200
environment:
PREFECT_UI_URL: "http://localhost:4200"
PREFECT_API_URL: "http://server:4200/api"
TZ: "Asia/Tokyo"
# Slack通知を使う場合、.env で SLACK_WEBHOOK_URL を設定
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-}
volumes:
- ./src/flows:/opt/flows
- prefect-data:/root/.prefect
worker:
image: prefecthq/prefect:2-latest
container_name: prefect-worker
depends_on: [server]
environment:
PREFECT_API_URL: "http://server:4200/api"
TZ: "Asia/Tokyo"
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-}
volumes:
- ./src/flows:/opt/flows
command: >
bash -lc "
pip install -r /opt/flows/requirements.txt >/dev/null 2>&1 || true &&
prefect work-pool create process-pool -t process || true &&
prefect worker start -p process-pool
"
volumes:
prefect-data:

49
docs/how_to_use.md Normal file
View File

@ -0,0 +1,49 @@
# prefectの使い方
Flow関数の中で、Prefectの @task が付いた関数を呼び出すことで、処理単位(タスク)を組み合わせて実行します。
## コンポ―ネート
### @flow
Prefectにおける「ワークフロー(全体の処理のまとまり)」
を定義するデコレータです。
Pythonの関数を「フロー関数(Flow Function)」に変えます。
```py
@flow
def etl_flow(d: str | None = None):
d = d or date.today().isoformat()
load(transform(extract(d)))
```
タスクの呼び出しががわかりにくいので分解すると以下になる
```py
@flow
def etl_flow(d=None):
d = d or date.today().isoformat()
# load(transform(extract(d)))
raw = extract(d)
clean = transform(raw)
load(clean)
```
### @task
Prefectが管理する個々の処理単位タスクを定義します。
通常のPython関数にリトライやログ管理、
依存関係管理などを付けられる。
```py
@task(retries=3, retry_delay_seconds=10)
def extract(d):
return f"raw({d})"
```
* retries:
* 最大3回リトライ
* retry_delay_seconds:
* 失敗したら10秒待って再試行という「実行単位」

14
examples/example_csv.py Normal file
View File

@ -0,0 +1,14 @@
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from dotenv import load_dotenv
load_dotenv("../.env")
from lib.csv_collector import CSVWriter
from lib.custom_logger import get_logger
logger = get_logger()
logger.info("Starting CSV example script")

View File

@ -1,2 +1,8 @@
requests
prefect
python-dotenv
prefect
pandas==2.3.2
duckdb==1.3.2
google-cloud-storage

96
src/flows/api_etl_flow.py Normal file
View File

@ -0,0 +1,96 @@
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from dotenv import load_dotenv
load_dotenv("../.env")
from prefect import flow, task,get_run_logger
from typing import Optional
from datetime import datetime
from models.csv_model_base import CSVBaseModel
from providers.api_g_news import ApiGNews
from providers.google_cloud_storage_provider import GoogleCloudStorageProvider
from lib.csv_collector import CSVWriter
class NewsData(CSVBaseModel):
title:str
url:str=""
description: Optional[str] = None
content: Optional[str] = None
image_url: Optional[str] = None
description: Optional[str] = None
content: Optional[str] = None
category:str="politics"
source_name:str=""
language: Optional[str] = "ja" # "ja", "en" など
country: Optional[str] = "jp" # "jp" など
pub_date: Optional[str] = None # "2023-10-01 12:00:00" など
@task(retries=2, retry_delay_seconds=10)
def call_api() -> list[dict]:
logger = get_run_logger()
logger.info("Starting API ETL Flow")
rets = ApiGNews.get_top_headlines(
category="nation",
lang="ja",
country="jp",
query="政治",
)
logger.info(f"Fetched {len(rets)} articles from GNews API")
return rets
@task()
def format_to_model(ret:list[dict]) -> list[NewsData]:
"""APIレスポンスをモデルに変換"""
logger = get_run_logger()
logger.info("Formatting API response to NewsData models")
models = []
for item in ret:
model = NewsData(
title=item['title'],
url=item['url'],
description=item.get('description',None),
content=item.get('content',None),
image_url=item.get('image',None),
pub_date=item.get('publishedAt',None),
category="politics",
source_name=item.get('source',{}).get('name',""),
language="ja",
country="jp",
)
models.append(model)
logger.info(f"Formatted {len(models)} NewsData models")
return models
@task()
def write_csv(models:list[NewsData]):
logger = get_run_logger()
logger.info("write_csv API response to NewsData models")
csv_data = NewsData.to_csv_from_items(models)
dt = datetime.now()
dt_str = dt.strftime("%Y-%m-%d")
file_name = f"news_{dt_str}_part-001.csv"
prefix = f"data_science/data/y={dt.strftime('%Y')}/news"
provider = GoogleCloudStorageProvider()
bucket_name = os.getenv("GCS_BUCKET_NAME")
provider.write_csv_item(
bucket=bucket_name,
object_name=f"{prefix}/{file_name}",
records=csv_data,
)
@flow
def api_etl_flow():
# E: API呼び出し及びモデルに変換
ret = call_api()
models = format_to_model(ret)
# Load: CSV書き出し
write_csv(models)
if __name__ == "__main__":
api_etl_flow()

View File

@ -0,0 +1,12 @@
from .csv_writer import CSVWriter
from .csv_reader import CSVReader
from .csv_editor import CSVEditColumn,CSVEditMapper
from .csv_analyzer import CSVAnalyzer
__all__ = [
"CSVWriter",
"CSVReader",
"CSVEditColumn",
"CSVEditMapper",
"CSVAnalyzer",
]

View File

@ -0,0 +1,118 @@
import os
import pandas as pd
from zoneinfo import ZoneInfo
from typing import Union
from utils.types import DataLayer
from lib.custom_logger import get_logger
logger = get_logger()
from .csv_writer import CSVWriter
from .csv_reader import CSVReader
class CSVAnalyzer:
@classmethod
def _separate_month_to_df(
cls,
header: list,
data_rows: list,
date_key: str = "published_at",
tz: str | None = None) -> pd.DataFrame | None:
if not data_rows:
return None
df = pd.DataFrame(data_rows, columns=header)
# 日付のデータ列を加工する(datetime型に変換,タイムゾーン変換)
df[date_key] = pd.to_datetime(df[date_key], errors="coerce", utc=True)
if tz:
df[date_key] = df[date_key].dt.tz_convert(ZoneInfo(tz))
# 年月列を追加
df["year_month"] = df[date_key].dt.to_period("M")
# 7) グループごとにdictリストへ
return df
@classmethod
def separate_month_to_dict(
cls,
header: list,
data_rows: list,
date_key: str = "published_at",
tz: str | None = None) -> dict[str, list[dict]] | None:
"""
年月ごとにデータを分割する(list of list形式-> dict of list of dict形式)
"""
df = cls._separate_month_to_df(header, data_rows, date_key, tz)
if df is None:
return None
return {
str(ym): g.drop(columns=["year_month"]).to_dict(orient="records")
for ym, g in df.groupby("year_month", sort=True)
}
@classmethod
def write_separated_month(
cls,
records,
domain: str,
event: str,
layer:Union[str, DataLayer],
prefix: str = None,
data_format: str = "%Y-%m",
is_year: bool=True,
is_month: bool=True,
data_key: str = "published_at",
tz: str | None = None,
):
"""年月ごとにデータを分割してCSVファイルに保存する"""
if not records or len(records) < 2:
logger.warning("No records to process.")
return
header = records[0]
data_rows = records[1:]
df = cls._separate_month_to_df(header, data_rows, data_key, tz)
if df is None:
return
for ym, g in df.groupby("year_month", sort=True):
logger.info(f"Processing year-month: {ym}")
y, m = str(ym).split("-")
folder_path = CSVWriter.get_filepath(
domain=domain,
layer=layer)
if is_year:
folder_path = f"{folder_path}/y={y}"
if is_month:
folder_path = f"{folder_path}/m={m}"
filename = CSVWriter.get_filename(
event=event,
prefix=prefix,
date_format=data_format,
dt=str(ym) + "-01",
extension=".csv"
)
fpath = os.path.join(folder_path, filename)
os.makedirs(folder_path, exist_ok=True)
logger.info(f"Writing to file: {fpath}")
g.drop(columns=["year_month"]).to_csv(fpath, index=False, encoding="utf-8")
# result = {}
# for year_month, group in df.groupby('year_month'):
# year = year_month.year
# month = year_month.month
# logger.info(f"y={year}/m={month:02d}")

View File

@ -0,0 +1,110 @@
# import os
# import csv
from typing import Optional, TypeVar,Callable
from dataclasses import dataclass
from .csv_reader import CSVReader
from lib.custom_logger import get_logger
logger = get_logger()
T = TypeVar("T")
ColCallback = Callable[[int, list, dict], T]
@dataclass
class CSVEditColumn():
"""CSV編集用の列情報"""
name: str
value: any = None
key_name: str = None
cb: Optional[ColCallback] = None
def execute(self, row_index: int, row: list, header_map: dict) -> any:
"""値を取得する"""
try:
if self.cb:
return self.cb(row_index, row, header_map)
elif self.key_name and self.key_name in header_map:
index = header_map[self.key_name]
return row[index]
else:
return self.value
except Exception as e:
logger.error(f"Error in CSVEditColumn.execute: {e}")
logger.error(f"row_index: {row_index}, row: {row}, header_map: {header_map}")
logger.error(f"Column info - name: {self.name}, value: {self.value}, key_name: {self.key_name}, cb: {self.cb}")
raise e
class CSVEditMapper:
"""CSV編集用のマッパー"""
def __init__(self, header_map: dict = None):
self.columns: list[CSVEditColumn] = []
self.header_map: dict = header_map if header_map else {}
def add(self, column: CSVEditColumn):
self.columns.append(column)
def add_column(self, name: str, key_name: str = None):
if not key_name:
key_name = name
self.columns.append(CSVEditColumn(name, None, key_name))
def add_value(self, name: str, value: any):
self.columns.append(CSVEditColumn(name, value))
def add_callback(self, name: str, cb: callable):
self.columns.append(CSVEditColumn(name, cb=cb))
def auto_columns(self):
"""既存のヘッダー情報から自動的に列を追加する"""
if not self.header_map or len(self.header_map) == 0:
return
# 自動的に追加するが順番はインデックス順
sorted_items = sorted(self.header_map.items(), key=lambda item: item[1])
for key, idx in sorted_items:
self.add_column(name=key, key_name=key)
def get_column_values(self,key_name:str,row,null_value:any=None) -> any:
idx = self.header_map[key_name]
if idx is None or idx < 0:
return null_value
return row[idx]
def edit(self, records: list[list]) -> list[list]:
"""CSVデータを編集する"""
new_records = []
# ヘッダー行を追加する
header = []
for col in self.columns:
header.append(col.name)
new_records.append(header)
if not records or len(records) < 2:
return new_records
if self.header_map is None or len(self.header_map) == 0:
self.header_map = CSVReader.header_map(records[0])
# データ加工を実行する
for i,rows in enumerate(records[1:]):
new_row = []
for col in self.columns:
_value = col.execute(i, rows, self.header_map)
new_row.append(_value)
new_records.append(new_row)
return new_records

View File

@ -0,0 +1,39 @@
import os
import csv
from typing import List,Union
from datetime import datetime
from utils.types import DataLayer
from lib.custom_logger import get_logger
logger = get_logger()
class CSVReader:
"""CSVファイル書き込みユーティリティ"""
BASE_DIR = "data"
@classmethod
def read(cls, file_path: str) -> List[any]:
"""CSVファイルを配列として読み込む"""
if not os.path.exists(file_path):
logger.warning(f"File not found: {file_path}")
return []
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
return list(reader)
def read_dict(cls, file_path: str) -> List[dict]:
"""CSVファイルを読み込む(辞書型)"""
if not os.path.exists(file_path):
logger.warning(f"File not found: {file_path}")
return []
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
return list(reader)
@classmethod
def header_map(cls, headers: list) -> dict[str,int]:
"""CSV配列のヘッダー情報よりマッピング辞書を生成"""
return {h: i for i, h in enumerate(headers)}

View File

@ -0,0 +1,178 @@
import os
import csv
from typing import List,Union
from datetime import datetime
from io import StringIO
from utils.types import DataLayer
from lib.custom_logger import get_logger
logger = get_logger()
class CSVWriter:
"""CSVファイル書き込みユーティリティ"""
BASE_DIR = "data"
@classmethod
def get_filepath(cls,
domain: str,
layer:Union[str, DataLayer,None],
is_year: bool=False,
is_month: bool=False,
is_day: bool=False,
is_hour: bool=False,
dt: Union[str,datetime]=None
) -> str:
"""フォルダパスを生成する"""
parts = [cls.BASE_DIR]
parts.append(domain)
if layer:
parts.append(layer)
if dt is None:
dt = datetime.now()
elif isinstance(dt, str):
dt = datetime.fromisoformat(dt)
if is_year:
parts.append(f"y={dt.strftime('%Y')}")
if is_month:
parts.append(f"m={dt.strftime('%m')}")
if is_day:
parts.append(f"d={dt.strftime('%d')}")
if is_hour:
parts.append(f"h={dt.strftime('%H')}")
folder_path = os.path.join(*parts)
logger.debug(f"Generated CSV folder path: {folder_path}")
return os.path.join(*parts)
@classmethod
def get_filename(
cls,
event: str,
prefix: str = None,
date_format: str = "%Y-%m-%d",
dt: Union[str,datetime] = None,
part: int = None,
extension: str = ".csv") -> str:
"""
CSVファイルのパスを生成
Args:
prefix (str, optional): ファイル名の接頭辞. Defaults to None.
date_format (str, optional): 日付フォーマット. Defaults to None. : "%Y-%m-%d"
dt (datetime, optional): 日付情報. Defaults to None.
part (int, optional): パーティション番号. Defaults to None.
extension (str, optional): ファイル拡張子. Defaults to ".csv".
"""
file_names_part = []
if prefix:
file_names_part.append(prefix)
file_names_part.append(event)
if date_format:
# 日時データに変換
if dt is None:
dt = datetime.now()
elif isinstance(dt, str):
dt = datetime.fromisoformat(dt)
date_str = dt.strftime(date_format)
file_names_part.append(date_str)
if part is not None:
file_names_part.append(f"part-{part:03d}")
file_name = "_".join(file_names_part) + extension
logger.debug(f"Generated CSV file name: {file_name}")
return file_name
@classmethod
def write(
cls,
records:List,
domain:str,
layer:Union[str, DataLayer],
event: str,
prefix: str = None,
date_format: str = "%Y-%m-%d",
dt: Union[str,datetime] = None,
part: int = None,
extension: str = ".csv",
is_year: bool=False,
is_month: bool=False,
is_day: bool=False,
is_hour: bool=False,
is_update: bool=False,
) -> str:
"""CSVデータを文字列として生成"""
if not records:
logger.warning("No records to write.")
return ""
folder_path = cls.get_filepath(
domain=domain,
layer=layer,
is_year=is_year,
is_month=is_month,
is_day=is_day,
is_hour=is_hour,
dt=dt
)
filename = cls.get_filename(
event=event,
prefix=prefix,
date_format=date_format,
dt=dt,
part=part,
extension=extension)
os.makedirs(folder_path, exist_ok=True)
full_filename = os.path.join(folder_path, filename)
if not is_update and os.path.exists(full_filename):
logger.info(f"File already exists and will not be overwritten: {full_filename}")
return full_filename
with open(full_filename, mode="w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerows(records)
return full_filename
@classmethod
def write_with_filename(
cls,
records:List,
filename: str,
is_update: bool=False,
) -> str:
"""CSVデータを指定されたファイルパスに書き込む"""
if not records:
logger.warning("No records to write.")
return ""
os.makedirs(os.path.dirname(filename), exist_ok=True)
if not is_update and os.path.exists(filename):
logger.info(f"File already exists and will not be overwritten: {filename}")
return filename
with open(filename, mode="w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerows(records)
return filename
@classmethod
def csv_bytes(
cls,
records:List,
) -> bytes:
"""CSVデータをバイト列として生成"""
buf = StringIO(newline="")
writer = csv.writer(buf, quoting=csv.QUOTE_ALL)
writer.writerows(records)
return buf.getvalue().encode('utf-8')

56
src/lib/custom_logger.py Normal file
View File

@ -0,0 +1,56 @@
import logging
import functools
from .singleton import Singleton
class CustomLogger(Singleton):
"""
Singleton logger class that initializes a logger with a specified name and log file.
It provides a method to log entry and exit of functions.
"""
def __init__(self, name='main', log_file=None, level=logging.INFO):
if hasattr(self, '_initialized') and self._initialized:
return # すでに初期化済みなら何もしない
# self.logger.setLevel(level)
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.propagate = False
formatter = logging.Formatter(
'%(asctime)s %(levelname)s [%(filename)s:%(lineno)3d]: %(message)s'
)
# Console handler
ch = logging.StreamHandler()
ch.setFormatter(formatter)
self.logger.addHandler(ch)
# File handler
if log_file:
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
self._initialized = True
def get_logger(self):
return self.logger
def log_entry_exit(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.logger.info(f"Enter: {func.__qualname__}")
result = func(*args, **kwargs)
self.logger.info(f"Exit: {func.__qualname__}")
return result
return wrapper
def get_logger(name='main', log_file=None, level=logging.INFO):
custom_logger = CustomLogger(name, log_file, level)
return custom_logger.get_logger()

20
src/lib/singleton.py Normal file
View File

@ -0,0 +1,20 @@
"""Singleton pattern implementation in Python.
This implementation is thread-safe and ensures that only one instance of the class is created.
Singleton が提供するのは同じインスタンスを返す仕組み
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある
"""
import threading
class Singleton(object):
_instances = {}
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances: # ダブルチェック
cls._instances[cls] = super(Singleton, cls).__new__(cls)
return cls._instances[cls]

View File

@ -0,0 +1,42 @@
from datetime import datetime
import json
from typing import ClassVar, Optional, List
from pydantic import BaseModel
class CSVBaseModel(BaseModel):
"""BaseModelにCSV用の共通機能を追加した基底クラス"""
# クラスごとに除外設定を持てるようにする
csv_excludes: ClassVar[List[str]] = []
@classmethod
def to_headers(cls, excepts: Optional[List[str]] = None) -> List[str]:
"""CSVヘッダーを自動生成"""
fields = list(cls.model_fields.keys()) # 定義順を保持
if excepts:
fields = [f for f in fields if f not in excepts]
return fields
def to_row(self, excepts: Optional[List[str]] = None) -> List[str]:
"""インスタンスをCSV行データに変換"""
header = self.to_headers(excepts=excepts)
row = []
for f in header:
val = getattr(self, f)
if isinstance(val, (dict, list)):
row.append(json.dumps(val, ensure_ascii=False)) # dictやlistはJSON文字列に
elif isinstance(val, datetime):
row.append(val.isoformat()) # datetimeはISO8601文字列に
elif val is None:
row.append("")
else:
row.append(str(val))
return row
@staticmethod
def to_csv_from_items(items: List['CSVBaseModel']) -> List:
"""CSV行データをまとめて取得"""
if not items:
return ""
headers = items[0].to_headers()
rows = [item.to_row() for item in items]
return [headers] + rows

View File

@ -0,0 +1,95 @@
import requests
import os
from lib.custom_logger import get_logger
logger = get_logger()
class ApiGNews:
"""
GNewsを操作するクラス
Notes:
- GNews APIを使用してニュース記事を取得するためのクラス
- APIキーは環境変数 `GNEWS_API_KEY` から取得されます
- 詳細なAPIドキュメントは https://gnews.io/docs/ を参照してください
"""
GNEWS_API_KEY = os.getenv("GNEWS_API_KEY")
@classmethod
def get_news(
cls,
query: str = None,
lang: str = "jp", # en,
country: str = "jp", # us,
max: int = 10,
from_at: str = None, # ISO 8601形式の日時文字列 (例: "2023-10-01T00:00:00Z")
to_at: str = None,
):
"""
GNewsからニュース記事を取得する
Args:
query (str): 検索クエリ
lang (str): 記事の言語コード (: "jp" = 日本語)
country (str): 国コード (: "jp" = 日本)
max (int): 取得件数の上限 (最大100)
from_at (str): 取得開始日時 (ISO 8601形式)
to_at (str): 取得終了日時 (ISO 8601形式)
"""
url = "https://gnews.io/api/v4/search"
params = {
"apikey": cls.GNEWS_API_KEY,
"q": query,
"lang": lang,
"country": country,
"max": max,
"from": from_at,
"to": to_at,
}
# None値は送らない
params = {k: v for k, v in params.items() if v is not None}
response = requests.get(url,params=params)
response.raise_for_status()
json_data:dict = response.json()
logger.debug(f"GNews API Response: {json_data}")
return json_data.get("articles", [])
@classmethod
def get_top_headlines(
cls,
category: str = None, # business, entertainment, general, health, science, sports, technology
lang: str = "jp", # en,
country: str = "jp", # us,
max: int = 10,
from_at: str = None, # ISO 8601形式の日時文字列 (例: "2023-10-01T00:00:00Z")
to_at: str = None,
query: str = None,
):
""" GNewsからトップニュース記事を取得する
Args:
category (str): カテゴリ (business, entertainment, general, health, science, sports, technology)
lang (str): 記事の言語コード (: "jp" = 日本語)
country (str): 国コード (: "jp" = 日本)
max (int): 取得件数の上限 (最大100)
from_at (str): 取得開始日時 (ISO 8601形式)
to_at (str): 取得終了日時 (ISO 8601形式)
query (str): 検索クエリ
"""
url = "https://gnews.io/api/v4/top-headlines"
params = {
"apikey": cls.GNEWS_API_KEY,
"category": category,
"lang": lang,
"country": country,
"max": max,
"from": from_at,
"to": to_at,
"q": query,
}
# None値は送らない
params = {k: v for k, v in params.items() if v is not None}
response = requests.get(url,params=params)
response.raise_for_status()
json_data:dict = response.json()
logger.debug(f"GNews API Response: {json_data}")
return json_data.get("articles", [])

View File

@ -0,0 +1,35 @@
import duckdb
class DuckDBProvider:
def __init__(self, db_path: str = ":memory:", read_only: bool = False):
self.con = self.connect(db_path, read_only)
def connect(self, db_path: str = ":memory:", read_only: bool = False):
return duckdb.connect(database=db_path, read_only=read_only)
def close(self):
"""接続を閉じる"""
if self.con:
self.con.close()
def query_df(self, sql: str):
"""SQLクエリを実行してDataFrameで返す"""
return self.con.execute(sql).df()
def max_value(
self,
file_path: str,
column: str,
hive_partitioning: bool = True,
union_by_name: bool = True,
) -> any:
"""CSVファイルの指定列の最大値を取得する"""
query = f"""
SELECT MAX({column}) AS max_{column}
FROM read_csv_auto('{file_path}',
hive_partitioning={1 if hive_partitioning else 0},
union_by_name={1 if union_by_name else 0}
)
"""
result = self.con.execute(query).fetchone()[0]
return result

View File

@ -0,0 +1,191 @@
import os
import io
from typing import Optional, List, Dict, Any, Union, BinaryIO
from datetime import timedelta
import mimetypes
import csv
from google.cloud import storage
from google.oauth2 import service_account
from lib.custom_logger import get_logger
logger = get_logger()
import zipfile
from pathlib import Path
class GoogleCloudStorageProvider:
def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None):
try:
if cred_path:
creds = service_account.Credentials.from_service_account_file(cred_path)
# プロジェクト未指定なら credentials から取得
effective_project = project or creds.project_id
self._client = storage.Client(
project=effective_project, credentials=creds
)
logger.info(f"GCS client initialized with service account file. project={effective_project}")
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"):
cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
creds = service_account.Credentials.from_service_account_info(cred_json)
effective_project = project or creds.project_id
self._client = storage.Client(
project=effective_project, credentials=creds
)
logger.info("GCS client initialized with credentials from environment variable.")
else:
self._client = storage.Client(project=project)
logger.info("GCS client initialized with default credentials (ADC).")
except Exception as e:
logger.error(f"GCS initialization failed: {e}")
raise
# Private methods to get bucket and blob references
def _bucket(self, bucket: str) -> storage.Bucket:
return self._client.bucket(bucket)
def _blob(self, bucket: str, object_name: str) -> storage.Blob:
return self._bucket(bucket).blob(object_name)
# バケット操作
def get_buckets(self) -> List[str]:
buckets: List[storage.Bucket] = self._client.list_buckets()
return [b.name for b in buckets]
def create_bucket(self, bucket_name: str, location: str = "ASIA-NORTHEAST1", storage_class: str = "STANDARD"):
b = storage.Bucket(self._client, name=bucket_name)
b.storage_class = storage_class
return self._client.create_bucket(b, location=location)
def is_exists_bucket(self, bucket_name: str) -> bool:
try:
self._client.get_bucket(bucket_name)
return True
except Exception:
return False
# オブジェクト操作
def get_items(self, bucket: str, prefix: str | None = None, match_glob:str | None=None) -> List[Dict[str, Any]]:
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix,match_glob=match_glob)
return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type}
for bl in items]
def is_exists_item(self, bucket: str, object_name: str) -> bool:
return self._blob(bucket, object_name).exists()
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
content_type: str | None = None) -> Dict[str, Any]:
"""
オブジェクトを書き込む
Args:
bucket (str): バケット名
object_name (str): オブジェクト名
data (Union[bytes, BinaryIO, str]): 書き込むデータ
content_type (Optional[str]): コンテンツタイプ(MIMEタイプ
Returns:
Dict[str, Any]: 書き込んだオブジェクトの情報
"""
blob = self._blob(bucket, object_name)
if content_type is None:
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
blob.content_type = content_type
if isinstance(data, (bytes, bytearray)):
blob.upload_from_file(io.BytesIO(data), content_type=content_type, rewind=True)
elif hasattr(data, "read"):
blob.upload_from_file(data, content_type=content_type, rewind=True)
elif isinstance(data, str) and os.path.exists(data):
blob.upload_from_filename(data, content_type=content_type)
else:
raise ValueError("data must be bytes, file-like, or existing filepath")
return {"name": blob.name, "size": blob.size, "content_type": blob.content_type}
def read_item(self, bucket: str, object_name: str, as_text: bool = False, encoding: str = "utf-8"):
data = self._blob(bucket, object_name).download_as_bytes()
return data.decode(encoding) if as_text else data
def delete_item(self, bucket: str, object_name: str):
"""オブジェクトを削除する"""
self._blob(bucket, object_name).delete()
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
expires: timedelta = timedelta(hours=1)) -> str:
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
def zip_items(
self,
bucket: str,
object_names: List[str],
) -> bytes:
"""
複数のGCSオブジェクトを1つのZIPにまとめZIPバイナリ(bytes)を返す
Args:
bucket (str): バケット名
object_names (List[str]): 対象オブジェクトのリスト
Returns:
bytes: ZIPファイルのバイナリ
"""
out = io.BytesIO()
with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for obj in object_names:
blob = self._blob(bucket, obj)
if not blob.exists():
raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}")
buf = io.BytesIO()
blob.download_to_file(buf)
buf.seek(0)
arcname = Path(obj).name
zf.writestr(arcname, buf.read())
zf.comment = f"bucket={bucket}, files={len(object_names)}".encode()
return out.getvalue()
def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""):
"""
ローカルフォルダをGCSに再帰的にアップロードする
Args:
bucket (str): バケット名
folder_path (str): ローカルフォルダのパス
gcs_prefix (str): GCS上のプレフィックス(フォルダパス)
"""
_bucket = self._bucket(bucket)
for root, _, files in os.walk(folder_path):
for file in files:
local_file_path = os.path.join(root, file)
# フォルダ構造を保つように相対パスを生成
relative_path = os.path.relpath(local_file_path, folder_path)
gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/")
blob = _bucket.blob(gcs_object_name)
blob.upload_from_filename(local_file_path)
logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}")
def write_csv_item(
self,
bucket: str,
object_name: str,
records: List):
"""CSVデータをGCSにアップロードする
Args:
bucket (str): バケット名
object_name (str): オブジェクト名
records (List): CSVデータのリスト
"""
blob = self._blob(bucket, object_name)
with blob.open("w", content_type="text/csv", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(records)
logger.info(f"Uploaded CSV to gs://{bucket}/{object_name}")
return {"name": blob.name, "size": blob.size, "content_type": blob.content_type}

6
src/utils/types.py Normal file
View File

@ -0,0 +1,6 @@
from enum import Enum
class DataLayer(str, Enum):
BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"