From a390658907631e6dff1ee933ffd09d6df6e91e39 Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Thu, 13 Nov 2025 19:47:22 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=8B=E3=83=A5=E3=83=BC=E3=82=B9=E3=82=92?= =?UTF-8?q?=E5=8F=96=E5=BE=97=E3=81=99=E3=82=8BAPI=E3=82=92=E6=95=B4?= =?UTF-8?q?=E5=82=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 +- docker-compose.yaml | 36 ---- docs/how_to_use.md | 49 +++++ examples/example_csv.py | 14 ++ {src/flows => examples/sample01}/etl_flow.py | 0 requirements.txt | 8 +- src/flows/api_etl_flow.py | 96 +++++++++ src/lib/csv_collector/__init__.py | 12 ++ src/lib/csv_collector/csv_analyzer.py | 118 +++++++++++ src/lib/csv_collector/csv_editor.py | 110 ++++++++++ src/lib/csv_collector/csv_reader.py | 39 ++++ src/lib/csv_collector/csv_writer.py | 178 ++++++++++++++++ src/lib/custom_logger.py | 56 +++++ src/lib/singleton.py | 20 ++ src/models/csv_model_base.py | 42 ++++ src/providers/api_g_news.py | 95 +++++++++ src/providers/duck_db_provider.py | 35 ++++ .../google_cloud_storage_provider.py | 191 ++++++++++++++++++ src/utils/types.py | 6 + 19 files changed, 1072 insertions(+), 39 deletions(-) delete mode 100644 docker-compose.yaml create mode 100644 docs/how_to_use.md create mode 100644 examples/example_csv.py rename {src/flows => examples/sample01}/etl_flow.py (100%) create mode 100644 src/flows/api_etl_flow.py create mode 100644 src/lib/csv_collector/__init__.py create mode 100644 src/lib/csv_collector/csv_analyzer.py create mode 100644 src/lib/csv_collector/csv_editor.py create mode 100644 src/lib/csv_collector/csv_reader.py create mode 100644 src/lib/csv_collector/csv_writer.py create mode 100644 src/lib/custom_logger.py create mode 100644 src/lib/singleton.py create mode 100644 src/models/csv_model_base.py create mode 100644 src/providers/api_g_news.py create mode 100644 src/providers/duck_db_provider.py create mode 100644 src/providers/google_cloud_storage_provider.py create mode 100644 src/utils/types.py diff --git a/.gitignore b/.gitignore index 0dbf2f2..bd633d9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +data +*service_accout.json +*sa.json + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ @@ -15,8 +19,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ -lib64/ parts/ sdist/ var/ diff --git a/docker-compose.yaml b/docker-compose.yaml deleted file mode 100644 index b24cd97..0000000 --- a/docker-compose.yaml +++ /dev/null @@ -1,36 +0,0 @@ -# prefect-template/docker-compose.yml -services: - server: - image: prefecthq/prefect:2-latest - container_name: prefect-server - command: ["prefect","server","start","--host","0.0.0.0"] - ports: ["4200:4200"] # UI: http://localhost:4200 - environment: - PREFECT_UI_URL: "http://localhost:4200" - PREFECT_API_URL: "http://server:4200/api" - TZ: "Asia/Tokyo" - # Slack通知を使う場合、.env で SLACK_WEBHOOK_URL を設定 - SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} - volumes: - - ./src/flows:/opt/flows - - prefect-data:/root/.prefect - - worker: - image: prefecthq/prefect:2-latest - container_name: prefect-worker - depends_on: [server] - environment: - PREFECT_API_URL: "http://server:4200/api" - TZ: "Asia/Tokyo" - SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL:-} - volumes: - - ./src/flows:/opt/flows - command: > - bash -lc " - pip install -r /opt/flows/requirements.txt >/dev/null 2>&1 || true && - prefect work-pool create process-pool -t process || true && - prefect worker start -p process-pool - " - -volumes: - prefect-data: diff --git a/docs/how_to_use.md b/docs/how_to_use.md new file mode 100644 index 0000000..fa6c598 --- /dev/null +++ b/docs/how_to_use.md @@ -0,0 +1,49 @@ +# prefectの使い方 + +Flow関数の中で、Prefectの @task が付いた関数を呼び出すことで、処理単位(タスク)を組み合わせて実行します。 + + +## コンポ―ネート + +### @flow + +Prefectにおける「ワークフロー(全体の処理のまとまり)」 +を定義するデコレータです。 + +Pythonの関数を「フロー関数(Flow Function)」に変えます。 + +```py +@flow +def etl_flow(d: str | None = None): + d = d or date.today().isoformat() + load(transform(extract(d))) +``` + +タスクの呼び出しががわかりにくいので分解すると以下になる + +```py +@flow +def etl_flow(d=None): + d = d or date.today().isoformat() + # load(transform(extract(d))) + raw = extract(d) + clean = transform(raw) + load(clean) +``` + +### @task + +Prefectが管理する個々の処理単位(タスク)を定義します。 +通常のPython関数にリトライやログ管理、 +依存関係管理などを付けられる。 + +```py +@task(retries=3, retry_delay_seconds=10) +def extract(d): + return f"raw({d})" +``` + +* retries: + * 最大3回リトライ +* retry_delay_seconds: + * 失敗したら10秒待って再試行という「実行単位」 \ No newline at end of file diff --git a/examples/example_csv.py b/examples/example_csv.py new file mode 100644 index 0000000..50ee76f --- /dev/null +++ b/examples/example_csv.py @@ -0,0 +1,14 @@ +import sys, os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) + +from dotenv import load_dotenv +load_dotenv("../.env") + +from lib.csv_collector import CSVWriter + + +from lib.custom_logger import get_logger +logger = get_logger() + +logger.info("Starting CSV example script") + diff --git a/src/flows/etl_flow.py b/examples/sample01/etl_flow.py similarity index 100% rename from src/flows/etl_flow.py rename to examples/sample01/etl_flow.py diff --git a/requirements.txt b/requirements.txt index 4268304..f8c3718 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,8 @@ requests -prefect \ No newline at end of file +python-dotenv +prefect + +pandas==2.3.2 + +duckdb==1.3.2 +google-cloud-storage \ No newline at end of file diff --git a/src/flows/api_etl_flow.py b/src/flows/api_etl_flow.py new file mode 100644 index 0000000..c3266fd --- /dev/null +++ b/src/flows/api_etl_flow.py @@ -0,0 +1,96 @@ +import sys, os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from dotenv import load_dotenv +load_dotenv("../.env") + +from prefect import flow, task,get_run_logger +from typing import Optional +from datetime import datetime + +from models.csv_model_base import CSVBaseModel +from providers.api_g_news import ApiGNews +from providers.google_cloud_storage_provider import GoogleCloudStorageProvider +from lib.csv_collector import CSVWriter + + +class NewsData(CSVBaseModel): + title:str + url:str="" + description: Optional[str] = None + content: Optional[str] = None + image_url: Optional[str] = None + description: Optional[str] = None + content: Optional[str] = None + category:str="politics" + source_name:str="" + language: Optional[str] = "ja" # "ja", "en" など + country: Optional[str] = "jp" # "jp" など + pub_date: Optional[str] = None # "2023-10-01 12:00:00" など + + +@task(retries=2, retry_delay_seconds=10) +def call_api() -> list[dict]: + logger = get_run_logger() + logger.info("Starting API ETL Flow") + rets = ApiGNews.get_top_headlines( + category="nation", + lang="ja", + country="jp", + query="政治", + ) + logger.info(f"Fetched {len(rets)} articles from GNews API") + return rets + +@task() +def format_to_model(ret:list[dict]) -> list[NewsData]: + """APIレスポンスをモデルに変換""" + logger = get_run_logger() + logger.info("Formatting API response to NewsData models") + models = [] + for item in ret: + model = NewsData( + title=item['title'], + url=item['url'], + description=item.get('description',None), + content=item.get('content',None), + image_url=item.get('image',None), + pub_date=item.get('publishedAt',None), + category="politics", + source_name=item.get('source',{}).get('name',""), + language="ja", + country="jp", + ) + models.append(model) + logger.info(f"Formatted {len(models)} NewsData models") + return models + +@task() +def write_csv(models:list[NewsData]): + logger = get_run_logger() + logger.info("write_csv API response to NewsData models") + csv_data = NewsData.to_csv_from_items(models) + dt = datetime.now() + dt_str = dt.strftime("%Y-%m-%d") + file_name = f"news_{dt_str}_part-001.csv" + prefix = f"data_science/data/y={dt.strftime('%Y')}/news" + provider = GoogleCloudStorageProvider() + bucket_name = os.getenv("GCS_BUCKET_NAME") + provider.write_csv_item( + bucket=bucket_name, + object_name=f"{prefix}/{file_name}", + records=csv_data, + ) + + +@flow +def api_etl_flow(): + # E: API呼び出し及びモデルに変換 + ret = call_api() + models = format_to_model(ret) + # Load: CSV書き出し + write_csv(models) + + +if __name__ == "__main__": + api_etl_flow() \ No newline at end of file diff --git a/src/lib/csv_collector/__init__.py b/src/lib/csv_collector/__init__.py new file mode 100644 index 0000000..aca0881 --- /dev/null +++ b/src/lib/csv_collector/__init__.py @@ -0,0 +1,12 @@ +from .csv_writer import CSVWriter +from .csv_reader import CSVReader +from .csv_editor import CSVEditColumn,CSVEditMapper +from .csv_analyzer import CSVAnalyzer + +__all__ = [ + "CSVWriter", + "CSVReader", + "CSVEditColumn", + "CSVEditMapper", + "CSVAnalyzer", +] \ No newline at end of file diff --git a/src/lib/csv_collector/csv_analyzer.py b/src/lib/csv_collector/csv_analyzer.py new file mode 100644 index 0000000..8cd5695 --- /dev/null +++ b/src/lib/csv_collector/csv_analyzer.py @@ -0,0 +1,118 @@ +import os +import pandas as pd +from zoneinfo import ZoneInfo +from typing import Union +from utils.types import DataLayer + +from lib.custom_logger import get_logger +logger = get_logger() + +from .csv_writer import CSVWriter +from .csv_reader import CSVReader + +class CSVAnalyzer: + + @classmethod + def _separate_month_to_df( + cls, + header: list, + data_rows: list, + date_key: str = "published_at", + tz: str | None = None) -> pd.DataFrame | None: + + if not data_rows: + return None + + df = pd.DataFrame(data_rows, columns=header) + # 日付のデータ列を加工する(datetime型に変換,タイムゾーン変換) + df[date_key] = pd.to_datetime(df[date_key], errors="coerce", utc=True) + if tz: + df[date_key] = df[date_key].dt.tz_convert(ZoneInfo(tz)) + # 年月列を追加 + df["year_month"] = df[date_key].dt.to_period("M") + # 7) グループごとにdictリストへ + return df + + @classmethod + def separate_month_to_dict( + cls, + header: list, + data_rows: list, + date_key: str = "published_at", + tz: str | None = None) -> dict[str, list[dict]] | None: + """ + 年月ごとにデータを分割する(list of list形式-> dict of list of dict形式) + """ + df = cls._separate_month_to_df(header, data_rows, date_key, tz) + if df is None: + return None + + return { + str(ym): g.drop(columns=["year_month"]).to_dict(orient="records") + for ym, g in df.groupby("year_month", sort=True) + } + + + @classmethod + def write_separated_month( + cls, + records, + domain: str, + event: str, + layer:Union[str, DataLayer], + prefix: str = None, + data_format: str = "%Y-%m", + is_year: bool=True, + is_month: bool=True, + data_key: str = "published_at", + tz: str | None = None, + ): + """年月ごとにデータを分割してCSVファイルに保存する""" + if not records or len(records) < 2: + logger.warning("No records to process.") + return + header = records[0] + data_rows = records[1:] + + df = cls._separate_month_to_df(header, data_rows, data_key, tz) + if df is None: + return + + for ym, g in df.groupby("year_month", sort=True): + logger.info(f"Processing year-month: {ym}") + y, m = str(ym).split("-") + folder_path = CSVWriter.get_filepath( + domain=domain, + layer=layer) + if is_year: + folder_path = f"{folder_path}/y={y}" + if is_month: + folder_path = f"{folder_path}/m={m}" + + filename = CSVWriter.get_filename( + event=event, + prefix=prefix, + date_format=data_format, + dt=str(ym) + "-01", + extension=".csv" + ) + fpath = os.path.join(folder_path, filename) + os.makedirs(folder_path, exist_ok=True) + logger.info(f"Writing to file: {fpath}") + g.drop(columns=["year_month"]).to_csv(fpath, index=False, encoding="utf-8") + + + + + + + + + # result = {} + # for year_month, group in df.groupby('year_month'): + # year = year_month.year + # month = year_month.month + # logger.info(f"y={year}/m={month:02d}") + + + diff --git a/src/lib/csv_collector/csv_editor.py b/src/lib/csv_collector/csv_editor.py new file mode 100644 index 0000000..d69fbd8 --- /dev/null +++ b/src/lib/csv_collector/csv_editor.py @@ -0,0 +1,110 @@ + +# import os +# import csv +from typing import Optional, TypeVar,Callable +from dataclasses import dataclass +from .csv_reader import CSVReader + + +from lib.custom_logger import get_logger +logger = get_logger() + +T = TypeVar("T") +ColCallback = Callable[[int, list, dict], T] + + + +@dataclass +class CSVEditColumn(): + """CSV編集用の列情報""" + name: str + value: any = None + key_name: str = None + cb: Optional[ColCallback] = None + + def execute(self, row_index: int, row: list, header_map: dict) -> any: + """値を取得する""" + try: + if self.cb: + return self.cb(row_index, row, header_map) + elif self.key_name and self.key_name in header_map: + index = header_map[self.key_name] + return row[index] + else: + return self.value + except Exception as e: + logger.error(f"Error in CSVEditColumn.execute: {e}") + logger.error(f"row_index: {row_index}, row: {row}, header_map: {header_map}") + logger.error(f"Column info - name: {self.name}, value: {self.value}, key_name: {self.key_name}, cb: {self.cb}") + raise e + +class CSVEditMapper: + """CSV編集用のマッパー""" + def __init__(self, header_map: dict = None): + self.columns: list[CSVEditColumn] = [] + self.header_map: dict = header_map if header_map else {} + + def add(self, column: CSVEditColumn): + self.columns.append(column) + + def add_column(self, name: str, key_name: str = None): + if not key_name: + key_name = name + self.columns.append(CSVEditColumn(name, None, key_name)) + + def add_value(self, name: str, value: any): + self.columns.append(CSVEditColumn(name, value)) + + def add_callback(self, name: str, cb: callable): + self.columns.append(CSVEditColumn(name, cb=cb)) + + def auto_columns(self): + """既存のヘッダー情報から自動的に列を追加する""" + if not self.header_map or len(self.header_map) == 0: + return + + # 自動的に追加するが順番はインデックス順 + sorted_items = sorted(self.header_map.items(), key=lambda item: item[1]) + for key, idx in sorted_items: + self.add_column(name=key, key_name=key) + + def get_column_values(self,key_name:str,row,null_value:any=None) -> any: + idx = self.header_map[key_name] + if idx is None or idx < 0: + return null_value + + return row[idx] + + + + + def edit(self, records: list[list]) -> list[list]: + """CSVデータを編集する""" + new_records = [] + # ヘッダー行を追加する + header = [] + for col in self.columns: + header.append(col.name) + new_records.append(header) + if not records or len(records) < 2: + return new_records + + if self.header_map is None or len(self.header_map) == 0: + self.header_map = CSVReader.header_map(records[0]) + + # データ加工を実行する + for i,rows in enumerate(records[1:]): + new_row = [] + for col in self.columns: + _value = col.execute(i, rows, self.header_map) + new_row.append(_value) + new_records.append(new_row) + + return new_records + + + + + + + diff --git a/src/lib/csv_collector/csv_reader.py b/src/lib/csv_collector/csv_reader.py new file mode 100644 index 0000000..777280b --- /dev/null +++ b/src/lib/csv_collector/csv_reader.py @@ -0,0 +1,39 @@ +import os +import csv +from typing import List,Union +from datetime import datetime +from utils.types import DataLayer + +from lib.custom_logger import get_logger +logger = get_logger() + +class CSVReader: + """CSVファイル書き込みユーティリティ""" + BASE_DIR = "data" + + @classmethod + def read(cls, file_path: str) -> List[any]: + """CSVファイルを配列として読み込む""" + if not os.path.exists(file_path): + logger.warning(f"File not found: {file_path}") + return [] + + with open(file_path, mode="r", newline="", encoding="utf-8") as f: + reader = csv.reader(f) + return list(reader) + + + def read_dict(cls, file_path: str) -> List[dict]: + """CSVファイルを読み込む(辞書型)""" + if not os.path.exists(file_path): + logger.warning(f"File not found: {file_path}") + return [] + + with open(file_path, mode="r", newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + return list(reader) + + @classmethod + def header_map(cls, headers: list) -> dict[str,int]: + """CSV配列のヘッダー情報よりマッピング辞書を生成""" + return {h: i for i, h in enumerate(headers)} diff --git a/src/lib/csv_collector/csv_writer.py b/src/lib/csv_collector/csv_writer.py new file mode 100644 index 0000000..8dc5699 --- /dev/null +++ b/src/lib/csv_collector/csv_writer.py @@ -0,0 +1,178 @@ +import os +import csv +from typing import List,Union +from datetime import datetime +from io import StringIO + + +from utils.types import DataLayer + +from lib.custom_logger import get_logger +logger = get_logger() + + + +class CSVWriter: + """CSVファイル書き込みユーティリティ""" + BASE_DIR = "data" + + @classmethod + def get_filepath(cls, + domain: str, + layer:Union[str, DataLayer,None], + is_year: bool=False, + is_month: bool=False, + is_day: bool=False, + is_hour: bool=False, + dt: Union[str,datetime]=None + ) -> str: + """フォルダパスを生成する""" + parts = [cls.BASE_DIR] + parts.append(domain) + if layer: + parts.append(layer) + if dt is None: + dt = datetime.now() + elif isinstance(dt, str): + dt = datetime.fromisoformat(dt) + if is_year: + parts.append(f"y={dt.strftime('%Y')}") + if is_month: + parts.append(f"m={dt.strftime('%m')}") + if is_day: + parts.append(f"d={dt.strftime('%d')}") + if is_hour: + parts.append(f"h={dt.strftime('%H')}") + folder_path = os.path.join(*parts) + logger.debug(f"Generated CSV folder path: {folder_path}") + return os.path.join(*parts) + + + @classmethod + def get_filename( + cls, + event: str, + prefix: str = None, + date_format: str = "%Y-%m-%d", + dt: Union[str,datetime] = None, + part: int = None, + extension: str = ".csv") -> str: + """ + CSVファイルのパスを生成 + + Args: + prefix (str, optional): ファイル名の接頭辞. Defaults to None. + date_format (str, optional): 日付フォーマット. Defaults to None. 例: "%Y-%m-%d" + dt (datetime, optional): 日付情報. Defaults to None. + part (int, optional): パーティション番号. Defaults to None. + extension (str, optional): ファイル拡張子. Defaults to ".csv". + """ + file_names_part = [] + if prefix: + file_names_part.append(prefix) + file_names_part.append(event) + + if date_format: + # 日時データに変換 + if dt is None: + dt = datetime.now() + elif isinstance(dt, str): + dt = datetime.fromisoformat(dt) + date_str = dt.strftime(date_format) + file_names_part.append(date_str) + + if part is not None: + file_names_part.append(f"part-{part:03d}") + file_name = "_".join(file_names_part) + extension + logger.debug(f"Generated CSV file name: {file_name}") + return file_name + + + @classmethod + def write( + cls, + records:List, + domain:str, + layer:Union[str, DataLayer], + event: str, + prefix: str = None, + date_format: str = "%Y-%m-%d", + dt: Union[str,datetime] = None, + part: int = None, + extension: str = ".csv", + is_year: bool=False, + is_month: bool=False, + is_day: bool=False, + is_hour: bool=False, + is_update: bool=False, + ) -> str: + """CSVデータを文字列として生成""" + if not records: + logger.warning("No records to write.") + return "" + folder_path = cls.get_filepath( + domain=domain, + layer=layer, + is_year=is_year, + is_month=is_month, + is_day=is_day, + is_hour=is_hour, + dt=dt + ) + + filename = cls.get_filename( + event=event, + prefix=prefix, + date_format=date_format, + dt=dt, + part=part, + extension=extension) + + os.makedirs(folder_path, exist_ok=True) + full_filename = os.path.join(folder_path, filename) + + if not is_update and os.path.exists(full_filename): + logger.info(f"File already exists and will not be overwritten: {full_filename}") + return full_filename + + with open(full_filename, mode="w", newline="", encoding="utf-8") as f: + writer = csv.writer(f, quoting=csv.QUOTE_ALL) + writer.writerows(records) + + return full_filename + + @classmethod + def write_with_filename( + cls, + records:List, + filename: str, + is_update: bool=False, + ) -> str: + """CSVデータを指定されたファイルパスに書き込む""" + if not records: + logger.warning("No records to write.") + return "" + + os.makedirs(os.path.dirname(filename), exist_ok=True) + + if not is_update and os.path.exists(filename): + logger.info(f"File already exists and will not be overwritten: {filename}") + return filename + + with open(filename, mode="w", newline="", encoding="utf-8") as f: + writer = csv.writer(f, quoting=csv.QUOTE_ALL) + writer.writerows(records) + + return filename + + @classmethod + def csv_bytes( + cls, + records:List, + ) -> bytes: + """CSVデータをバイト列として生成""" + buf = StringIO(newline="") + writer = csv.writer(buf, quoting=csv.QUOTE_ALL) + writer.writerows(records) + return buf.getvalue().encode('utf-8') + diff --git a/src/lib/custom_logger.py b/src/lib/custom_logger.py new file mode 100644 index 0000000..9137b2f --- /dev/null +++ b/src/lib/custom_logger.py @@ -0,0 +1,56 @@ +import logging +import functools +from .singleton import Singleton + +class CustomLogger(Singleton): + """ + Singleton logger class that initializes a logger with a specified name and log file. + It provides a method to log entry and exit of functions. + """ + + def __init__(self, name='main', log_file=None, level=logging.INFO): + if hasattr(self, '_initialized') and self._initialized: + return # すでに初期化済みなら何もしない + # self.logger.setLevel(level) + + self.logger = logging.getLogger(name) + self.logger.setLevel(level) + self.logger.propagate = False + + formatter = logging.Formatter( + '%(asctime)s %(levelname)s [%(filename)s:%(lineno)3d]: %(message)s' + ) + + # Console handler + ch = logging.StreamHandler() + ch.setFormatter(formatter) + self.logger.addHandler(ch) + + # File handler + if log_file: + fh = logging.FileHandler(log_file, encoding='utf-8') + fh.setFormatter(formatter) + self.logger.addHandler(fh) + + self._initialized = True + + + def get_logger(self): + return self.logger + + def log_entry_exit(self, func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + self.logger.info(f"Enter: {func.__qualname__}") + result = func(*args, **kwargs) + self.logger.info(f"Exit: {func.__qualname__}") + return result + return wrapper + + + + + +def get_logger(name='main', log_file=None, level=logging.INFO): + custom_logger = CustomLogger(name, log_file, level) + return custom_logger.get_logger() diff --git a/src/lib/singleton.py b/src/lib/singleton.py new file mode 100644 index 0000000..cc8cb16 --- /dev/null +++ b/src/lib/singleton.py @@ -0,0 +1,20 @@ +"""Singleton pattern implementation in Python. +This implementation is thread-safe and ensures that only one instance of the class is created. + +Singleton が提供するのは「同じインスタンスを返す仕組み」 +* __init__() は毎回呼ばれる(多くの人が意図しない動作) +* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。 +""" + +import threading + +class Singleton(object): + _instances = {} + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if cls not in cls._instances: + with cls._lock: + if cls not in cls._instances: # ダブルチェック + cls._instances[cls] = super(Singleton, cls).__new__(cls) + return cls._instances[cls] diff --git a/src/models/csv_model_base.py b/src/models/csv_model_base.py new file mode 100644 index 0000000..d316a1e --- /dev/null +++ b/src/models/csv_model_base.py @@ -0,0 +1,42 @@ +from datetime import datetime +import json +from typing import ClassVar, Optional, List +from pydantic import BaseModel + +class CSVBaseModel(BaseModel): + """BaseModelにCSV用の共通機能を追加した基底クラス""" + # クラスごとに除外設定を持てるようにする + csv_excludes: ClassVar[List[str]] = [] + + @classmethod + def to_headers(cls, excepts: Optional[List[str]] = None) -> List[str]: + """CSVヘッダーを自動生成""" + fields = list(cls.model_fields.keys()) # 定義順を保持 + if excepts: + fields = [f for f in fields if f not in excepts] + return fields + + def to_row(self, excepts: Optional[List[str]] = None) -> List[str]: + """インスタンスをCSV行データに変換""" + header = self.to_headers(excepts=excepts) + row = [] + for f in header: + val = getattr(self, f) + if isinstance(val, (dict, list)): + row.append(json.dumps(val, ensure_ascii=False)) # dictやlistはJSON文字列に + elif isinstance(val, datetime): + row.append(val.isoformat()) # datetimeはISO8601文字列に + elif val is None: + row.append("") + else: + row.append(str(val)) + return row + + @staticmethod + def to_csv_from_items(items: List['CSVBaseModel']) -> List: + """CSV行データをまとめて取得""" + if not items: + return "" + headers = items[0].to_headers() + rows = [item.to_row() for item in items] + return [headers] + rows diff --git a/src/providers/api_g_news.py b/src/providers/api_g_news.py new file mode 100644 index 0000000..fe77ac3 --- /dev/null +++ b/src/providers/api_g_news.py @@ -0,0 +1,95 @@ +import requests +import os + +from lib.custom_logger import get_logger +logger = get_logger() + +class ApiGNews: + """ + GNewsを操作するクラス + + Notes: + - GNews APIを使用してニュース記事を取得するためのクラス + - APIキーは環境変数 `GNEWS_API_KEY` から取得されます + - 詳細なAPIドキュメントは https://gnews.io/docs/ を参照してください + """ + + GNEWS_API_KEY = os.getenv("GNEWS_API_KEY") + + @classmethod + def get_news( + cls, + query: str = None, + lang: str = "jp", # en, + country: str = "jp", # us, + max: int = 10, + from_at: str = None, # ISO 8601形式の日時文字列 (例: "2023-10-01T00:00:00Z") + to_at: str = None, + ): + """ + GNewsからニュース記事を取得する + Args: + query (str): 検索クエリ + lang (str): 記事の言語コード (例: "jp" = 日本語) + country (str): 国コード (例: "jp" = 日本) + max (int): 取得件数の上限 (最大100) + from_at (str): 取得開始日時 (ISO 8601形式) + to_at (str): 取得終了日時 (ISO 8601形式) + """ + url = "https://gnews.io/api/v4/search" + params = { + "apikey": cls.GNEWS_API_KEY, + "q": query, + "lang": lang, + "country": country, + "max": max, + "from": from_at, + "to": to_at, + } + # None値は送らない + params = {k: v for k, v in params.items() if v is not None} + response = requests.get(url,params=params) + response.raise_for_status() + json_data:dict = response.json() + logger.debug(f"GNews API Response: {json_data}") + return json_data.get("articles", []) + + @classmethod + def get_top_headlines( + cls, + category: str = None, # business, entertainment, general, health, science, sports, technology + lang: str = "jp", # en, + country: str = "jp", # us, + max: int = 10, + from_at: str = None, # ISO 8601形式の日時文字列 (例: "2023-10-01T00:00:00Z") + to_at: str = None, + query: str = None, + ): + """ GNewsからトップニュース記事を取得する + Args: + category (str): カテゴリ (business, entertainment, general, health, science, sports, technology) + lang (str): 記事の言語コード (例: "jp" = 日本語) + country (str): 国コード (例: "jp" = 日本) + max (int): 取得件数の上限 (最大100) + from_at (str): 取得開始日時 (ISO 8601形式) + to_at (str): 取得終了日時 (ISO 8601形式) + query (str): 検索クエリ + """ + url = "https://gnews.io/api/v4/top-headlines" + params = { + "apikey": cls.GNEWS_API_KEY, + "category": category, + "lang": lang, + "country": country, + "max": max, + "from": from_at, + "to": to_at, + "q": query, + } + # None値は送らない + params = {k: v for k, v in params.items() if v is not None} + response = requests.get(url,params=params) + response.raise_for_status() + json_data:dict = response.json() + logger.debug(f"GNews API Response: {json_data}") + return json_data.get("articles", []) diff --git a/src/providers/duck_db_provider.py b/src/providers/duck_db_provider.py new file mode 100644 index 0000000..4cb08df --- /dev/null +++ b/src/providers/duck_db_provider.py @@ -0,0 +1,35 @@ +import duckdb + +class DuckDBProvider: + def __init__(self, db_path: str = ":memory:", read_only: bool = False): + self.con = self.connect(db_path, read_only) + + def connect(self, db_path: str = ":memory:", read_only: bool = False): + return duckdb.connect(database=db_path, read_only=read_only) + + def close(self): + """接続を閉じる""" + if self.con: + self.con.close() + + def query_df(self, sql: str): + """SQLクエリを実行してDataFrameで返す""" + return self.con.execute(sql).df() + + def max_value( + self, + file_path: str, + column: str, + hive_partitioning: bool = True, + union_by_name: bool = True, + ) -> any: + """CSVファイルの指定列の最大値を取得する""" + query = f""" + SELECT MAX({column}) AS max_{column} + FROM read_csv_auto('{file_path}', + hive_partitioning={1 if hive_partitioning else 0}, + union_by_name={1 if union_by_name else 0} + ) + """ + result = self.con.execute(query).fetchone()[0] + return result \ No newline at end of file diff --git a/src/providers/google_cloud_storage_provider.py b/src/providers/google_cloud_storage_provider.py new file mode 100644 index 0000000..4ab002c --- /dev/null +++ b/src/providers/google_cloud_storage_provider.py @@ -0,0 +1,191 @@ +import os +import io +from typing import Optional, List, Dict, Any, Union, BinaryIO +from datetime import timedelta +import mimetypes +import csv + +from google.cloud import storage +from google.oauth2 import service_account + +from lib.custom_logger import get_logger +logger = get_logger() + +import zipfile +from pathlib import Path + +class GoogleCloudStorageProvider: + + def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None): + try: + if cred_path: + creds = service_account.Credentials.from_service_account_file(cred_path) + # プロジェクト未指定なら credentials から取得 + effective_project = project or creds.project_id + self._client = storage.Client( + project=effective_project, credentials=creds + ) + logger.info(f"GCS client initialized with service account file. project={effective_project}") + elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"): + cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") + creds = service_account.Credentials.from_service_account_info(cred_json) + effective_project = project or creds.project_id + self._client = storage.Client( + project=effective_project, credentials=creds + ) + logger.info("GCS client initialized with credentials from environment variable.") + else: + self._client = storage.Client(project=project) + logger.info("GCS client initialized with default credentials (ADC).") + except Exception as e: + logger.error(f"GCS initialization failed: {e}") + raise + + # Private methods to get bucket and blob references + def _bucket(self, bucket: str) -> storage.Bucket: + return self._client.bucket(bucket) + + def _blob(self, bucket: str, object_name: str) -> storage.Blob: + return self._bucket(bucket).blob(object_name) + + + # バケット操作 + def get_buckets(self) -> List[str]: + buckets: List[storage.Bucket] = self._client.list_buckets() + return [b.name for b in buckets] + + def create_bucket(self, bucket_name: str, location: str = "ASIA-NORTHEAST1", storage_class: str = "STANDARD"): + b = storage.Bucket(self._client, name=bucket_name) + b.storage_class = storage_class + return self._client.create_bucket(b, location=location) + + def is_exists_bucket(self, bucket_name: str) -> bool: + try: + self._client.get_bucket(bucket_name) + return True + except Exception: + return False + + # オブジェクト操作 + def get_items(self, bucket: str, prefix: str | None = None, match_glob:str | None=None) -> List[Dict[str, Any]]: + items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix,match_glob=match_glob) + return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type} + for bl in items] + + def is_exists_item(self, bucket: str, object_name: str) -> bool: + return self._blob(bucket, object_name).exists() + + + def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str], + content_type: str | None = None) -> Dict[str, Any]: + """ + オブジェクトを書き込む + + Args: + bucket (str): バケット名 + object_name (str): オブジェクト名 + data (Union[bytes, BinaryIO, str]): 書き込むデータ + content_type (Optional[str]): コンテンツタイプ(MIMEタイプ) + Returns: + Dict[str, Any]: 書き込んだオブジェクトの情報 + """ + blob = self._blob(bucket, object_name) + if content_type is None: + content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream" + blob.content_type = content_type + + if isinstance(data, (bytes, bytearray)): + blob.upload_from_file(io.BytesIO(data), content_type=content_type, rewind=True) + elif hasattr(data, "read"): + blob.upload_from_file(data, content_type=content_type, rewind=True) + elif isinstance(data, str) and os.path.exists(data): + blob.upload_from_filename(data, content_type=content_type) + else: + raise ValueError("data must be bytes, file-like, or existing filepath") + return {"name": blob.name, "size": blob.size, "content_type": blob.content_type} + + def read_item(self, bucket: str, object_name: str, as_text: bool = False, encoding: str = "utf-8"): + data = self._blob(bucket, object_name).download_as_bytes() + return data.decode(encoding) if as_text else data + + def delete_item(self, bucket: str, object_name: str): + """オブジェクトを削除する""" + self._blob(bucket, object_name).delete() + + def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET", + expires: timedelta = timedelta(hours=1)) -> str: + return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method) + + def zip_items( + self, + bucket: str, + object_names: List[str], + ) -> bytes: + """ + 複数のGCSオブジェクトを1つのZIPにまとめ、ZIPバイナリ(bytes)を返す + + Args: + bucket (str): バケット名 + object_names (List[str]): 対象オブジェクトのリスト + Returns: + bytes: ZIPファイルのバイナリ + """ + out = io.BytesIO() + with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + for obj in object_names: + blob = self._blob(bucket, obj) + if not blob.exists(): + raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}") + + buf = io.BytesIO() + blob.download_to_file(buf) + buf.seek(0) + arcname = Path(obj).name + zf.writestr(arcname, buf.read()) + + zf.comment = f"bucket={bucket}, files={len(object_names)}".encode() + + return out.getvalue() + + def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""): + """ + ローカルフォルダをGCSに再帰的にアップロードする + + Args: + bucket (str): バケット名 + folder_path (str): ローカルフォルダのパス + gcs_prefix (str): GCS上のプレフィックス(フォルダパス) + """ + _bucket = self._bucket(bucket) + + for root, _, files in os.walk(folder_path): + for file in files: + local_file_path = os.path.join(root, file) + # フォルダ構造を保つように相対パスを生成 + relative_path = os.path.relpath(local_file_path, folder_path) + gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/") + + blob = _bucket.blob(gcs_object_name) + blob.upload_from_filename(local_file_path) + logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}") + + def write_csv_item( + self, + bucket: str, + object_name: str, + records: List): + + """CSVデータをGCSにアップロードする + + Args: + bucket (str): バケット名 + object_name (str): オブジェクト名 + records (List): CSVデータのリスト + + """ + blob = self._blob(bucket, object_name) + with blob.open("w", content_type="text/csv", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerows(records) + logger.info(f"Uploaded CSV to gs://{bucket}/{object_name}") + return {"name": blob.name, "size": blob.size, "content_type": blob.content_type} diff --git a/src/utils/types.py b/src/utils/types.py new file mode 100644 index 0000000..938dfe9 --- /dev/null +++ b/src/utils/types.py @@ -0,0 +1,6 @@ +from enum import Enum + +class DataLayer(str, Enum): + BRONZE = "bronze" + SILVER = "silver" + GOLD = "gold" \ No newline at end of file