From f74b1c7a1097f7d9141445617c0484ddb3ce861f Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Sun, 26 Oct 2025 17:10:27 +0900 Subject: [PATCH] =?UTF-8?q?=E6=9C=80=E6=96=B0=E3=82=B3=E3=83=BC=E3=83=89?= =?UTF-8?q?=E3=82=92=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 15 +- docments/big_query.md | 12 + docments/windows_disk.md | 66 ++ example/example_bigquery.py | 110 +++ example/example_bigquery_model.py | 74 ++ example/example_duck_db.py | 25 + example/example_duck_gcs.py | 87 +++ example/example_duck_model.py | 30 + example/example_rss.py | 47 ++ requirements.txt | 15 +- src/lib/custom_logger.py | 1 - src/lib/singleton.py | 2 +- src/lib/time_schedule.py | 2 +- src/models/bigquery_base_model.py | 510 ++++++++++++++ src/models/duck_base_model.py | 184 +++++ src/models/firestore_base_model.py | 8 +- src/providers/duck_db_provider.py | 73 ++ src/providers/firestore_provider.py | 8 +- .../google_cloud_bigquery_provider.py | 662 ++++++++++++++++++ src/providers/google_cloud_pubsub_provider.py | 0 .../google_cloud_storage_provider.py | 69 +- src/providers/one_drive_provider.py | 14 +- src/providers/rss_reader_client.py | 41 ++ 23 files changed, 2034 insertions(+), 21 deletions(-) create mode 100644 docments/big_query.md create mode 100644 docments/windows_disk.md create mode 100644 example/example_bigquery.py create mode 100644 example/example_bigquery_model.py create mode 100644 example/example_duck_db.py create mode 100644 example/example_duck_gcs.py create mode 100644 example/example_duck_model.py create mode 100644 example/example_rss.py create mode 100644 src/models/bigquery_base_model.py create mode 100644 src/models/duck_base_model.py create mode 100644 src/providers/duck_db_provider.py create mode 100644 src/providers/google_cloud_bigquery_provider.py create mode 100644 src/providers/google_cloud_pubsub_provider.py create mode 100644 src/providers/rss_reader_client.py diff --git a/README.md b/README.md index 5df4857..7f62bf3 100644 --- a/README.md +++ b/README.md @@ -3,4 +3,17 @@ ```sh python -m venv venv .\venv\Scripts\activate -``` \ No newline at end of file +``` + + +curl https://api.openai.com/v1/audio/transcriptions \ + -H "Authorization: Bearer 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" \ + -F "file=@part_001.m4a" \ + -F "model=gpt-4o-transcribe" \ + -F "response_format=text" \ + -F "language=ja" \ + -o part_001.txt + +ffmpeg -i voice.m4a -f segment -segment_time 600 -c copy part_%03d.m4a + +# フォーマットを揃える diff --git a/docments/big_query.md b/docments/big_query.md new file mode 100644 index 0000000..706e5c8 --- /dev/null +++ b/docments/big_query.md @@ -0,0 +1,12 @@ +## Big Queryは「データウェアハウス型」 + +Big QueryはOLAP(分析)用途の列指向データベースなので、 +RDB(MySQL,PostgreSQLなど)とは違い、行の一意性や整合性制約を持たない設計です。 + +## BigQueryに存在しない制約 + +* PRIMARY KEY +* FOREIGN KEY +* UNIQUE +* CHECK +* AUTO_INCREMENT diff --git a/docments/windows_disk.md b/docments/windows_disk.md new file mode 100644 index 0000000..17d482a --- /dev/null +++ b/docments/windows_disk.md @@ -0,0 +1,66 @@ + + +```powershell +# 一時ファイルの削除 +Remove-Item -Path "$env:TEMP\*" -Recurse -Force -ErrorAction SilentlyContinue +``` + + +移動できるデータを他ドライブへ + + +```powershell +Get-ChildItem C:\ -Recurse -ErrorAction SilentlyContinue | + Where-Object { -not $_.PSIsContainer } | + Sort-Object Length -Descending | + Select-Object FullName, @{Name="Size(GB)";Expression={"{0:N2}" -f ($_.Length / 1GB)}} -First 20 +``` + + +サードパーティツールで整理 + +WinDirStat +→ ディスクの使用状況を可視化して、大きいファイルを一目で把握。 + +``` +winget install WinDirStat +windirstat +``` + +CCleaner +→ 一時ファイルや不要レジストリの掃除に便利。 + + +``` +Remove-Item "C:\adobeTemp\*" -Recurse -Force -ErrorAction SilentlyContinue + + +ren "C:\adobeTemp" "adobeTemp_old" +``` + +attrib -h -s -r "C:\adobeTemp" + +ren C:\adobeTemp C:\adobeTemp_old +ren "C:\adobeTemp" "adobeTemp_old" + +mkdir "F:\adobeTemp" + +mklink /J C:\adobeTemp F:\adobeTemp + + +ren C:\adobeTemp C:\adobeTemp_old +リンク先のフォルダを作成(Dドライブに新しい保存場所を用意) + +powershell +Copy code +mkdir D:\adobeTemp +ジャンクションを作成 +管理者権限のコマンドプロンプトで: + +cmd +Copy code +mklink /J C:\adobeTemp D:\adobeTemp + + +cd "C:\Users\r_yam\AppData\Local\Docker\wsl\data" +optimize-vhd -Path .\ext4.vhdx -Mode Full \ No newline at end of file diff --git a/example/example_bigquery.py b/example/example_bigquery.py new file mode 100644 index 0000000..94a779a --- /dev/null +++ b/example/example_bigquery.py @@ -0,0 +1,110 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src"))) + +from lib.custom_logger import get_logger +logger = get_logger(level=10) + +from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider + + +def example_bigquery(): + try: + # GoogleCloudBigQueryProviderのインスタンスを作成 + provider = GoogleCloudBigQueryProvider( + cred_path="keys/google_service_accout.json", + ) + + dss = provider.get_datasets() + for ds in dss: + logger.info(f"Dataset ID: {ds.dataset_id}") + + # データセットを作成する + dataset_id = "example_dataset" + if provider.is_exists_dataset(dataset_id): + logger.info(f"Dataset {dataset_id} already exists.") + else: + dataset = provider.create_dataset(dataset_id) + logger.info(f"Dataset {dataset_id} created at {dataset.created}.") + + table_id = provider.full_table_id(dataset_id, "example_table") + # テーブルを作成する + if provider.is_exists_table(table_id): + logger.info(f"Table {table_id} already exists.") + else: + logger.info(f"Creating table {table_id}...") + schema = [ + provider.addSchemaField("device_code", "string", "REQUIRED", description="Device code"), + provider.addSchemaField("time_stamp", "timestamp", "REQUIRED", description="Timestamp"), + ] + table = provider.create_table( + table_id=table_id, + schema=schema + ) + + # tables = provider.get_tables(dataset_id) + # for table in tables: + # logger.info(f"Table ID: {table.table_id}") + + # テーブル情報を挿入する + # provider.insert_rows( + # table_id=table_id, + # rows=[ + # {"device_code": "device_001", "time_stamp": "2025-01-01 12:00:00"}, + # {"device_code": "device_002", "time_stamp": "2025-01-01 12:05:00"}, + # {"device_code": "device_003", "time_stamp": "2025-01-01 12:10:00"}, + # ] + # ) + # テーブル情報を確認する(JOBあり・SQL発行) + # job_qyuery = provider.excute_query( + # query=f"SELECT * FROM `{table_id}` where device_code='device_001'", + # ) + # results = job_qyuery.result() + # for row in results: + # logger.info(f"Row: {row}") + + + # テーブル情報を確認する(JOBあり) + # rows = provider.list_rows( + # table_id=table_id, + # ) + # for row in rows: + # logger.info(f"Row: {row}") + + # バッファ状況の確認 + # buffer_status = provider.get_streaming_buffer_info(table_id=table_id) + # logger.info(f"Streaming Buffer Info: {buffer_status}") + # テーブルを更新する + # query_job = provider.update_query( + # table_id=table_id, + # values={ + # "device_code": "'device_999'" + # }, + # were_clause="device_code='device_002'" + # ) + # query_job.result() # ジョブの完了を待機 + + + # # テーブル情報を確認する(JOBあり) + # job = provider.select_query( + # table_id=table_id, + # ) + # results = job.result() + # for row in results: + # logger.info(f"Row: {row}") + + # # テーブルのレコードを削除する + # provider.delete_query( + # table_id=table_id, + # were_clause="device_code='device_012'" + # ) + # テーブルを削除する + # provider.delete_table(table_id=table_id) + + + except Exception as e: + logger.error(f"Error in example_bigquery: {e}") + + + +example_bigquery() \ No newline at end of file diff --git a/example/example_bigquery_model.py b/example/example_bigquery_model.py new file mode 100644 index 0000000..eb184e6 --- /dev/null +++ b/example/example_bigquery_model.py @@ -0,0 +1,74 @@ +import sys +import os +from datetime import datetime + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src"))) +from typing import ClassVar, Optional + +from lib.custom_logger import get_logger +logger = get_logger(level=10) +from dataclasses import dataclass + +from models.bigquery_base_model import BigQueryBaseModel +from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider + +@dataclass +class ExamplModel(BigQueryBaseModel): + device_code: str + time_stamp: datetime + + table_id: ClassVar[str] = "example_dataset.example_table" + key_field: ClassVar[Optional[str]] = "device_code" + +def example_model(): + logger.info("Starting example_bigquery_model function.") + provider = GoogleCloudBigQueryProvider( + cred_path="keys/google_service_accout.json", + ) + ExamplModel.set_provider(provider) + + # テーブル内の全レコードを取得する + # records = ExamplModel.fetch_all() + # logger.info(f"Total records: {len(records)}") + # for record in records: + # logger.info(f"Record: {record}") + + # レコードを生成する + # new_record = ExamplModel( + # device_code="device_010", + # time_stamp=datetime(2025, 1, 1, 15, 0, 0) + # ) + # new_record.create() + + # レコードを大量に生成する + # ExamplModel.insert( + # rows=[ + # ExamplModel(device_code="device_011",time_stamp=datetime(2025, 1, 1, 15, 0, 0)), + # ExamplModel(device_code="device_012",time_stamp=datetime(2025, 1, 1, 15, 0, 0)), + # ]) + + # テーブルのストリームバッファ情報を取得する + if ExamplModel.is_streaming_buffer(): + logger.info("Table is currently receiving streaming data.") + logger.info("not updated or delete in the streaming buffer.") + else: + logger.info("Table is not receiving streaming data.") + # 特定の条件でレコードを削除する + # ExamplModel.delete(where=[("device_code", "=", "device_010")]) + # レコードを更新する + ExamplModel.update( + values={"device_code": "device_011_updated"}, + where=[("device_code", "=", "device_011")] + ) + + + + # records = ExamplModel.list(where=[("device_code", "=", "device_001")]) + # logger.info(f"Records with device_code='device_001': {records if records else 'No records found'}") + + + # record = ExamplModel.first() + # logger.info(f"First record: {record if record else 'No record found'}") + + +example_model() \ No newline at end of file diff --git a/example/example_duck_db.py b/example/example_duck_db.py new file mode 100644 index 0000000..aa7f339 --- /dev/null +++ b/example/example_duck_db.py @@ -0,0 +1,25 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src"))) + +from lib.custom_logger import get_logger +logger = get_logger(level=10) + +from providers.duck_db_provider import DuckDBProvider + +def example_duckdb(): + logger.info("Starting example_duckdb function.") + file_path = "./data_science/data/y=*/news/news_*.csv" + provider = DuckDBProvider() + sql = f""" + SELECT * + FROM read_csv_auto('{file_path}' , HEADER=TRUE, IGNORE_ERRORS=TRUE) + """ + result = provider.query_df(sql) + print("latest published_parsed:", result) + + # forで1件ずつ表示 + for idx, row in result.iterrows(): + logger.info(f"title:{row['title']}") + +example_duckdb() \ No newline at end of file diff --git a/example/example_duck_gcs.py b/example/example_duck_gcs.py new file mode 100644 index 0000000..c63f208 --- /dev/null +++ b/example/example_duck_gcs.py @@ -0,0 +1,87 @@ +""" +HMACキーがひつようになる +* Google Cloud Consoleで発行する +* https://console.cloud.google.com/storage/settings +* 「相互運用性(Interoperability)」タブを開く +* 「HMACキーを作成」ボタンを押す +* 使いたいサービスアカウントを選択 +""" + +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src"))) + +from dotenv import load_dotenv +load_dotenv() + + +from lib.custom_logger import get_logger +logger = get_logger(level=10) + +import duckdb +from providers.google_cloud_storage_provider import GoogleCloudStorageProvider +from providers.duck_db_provider import DuckDBProvider + + +def example_init_google_cloud_storage(): + + logger.info("Starting example_google_cloud_storage function.") + gcs_provider = GoogleCloudStorageProvider( + cred_path="./keys/google_service_accout.json", + ) + csv_data = """id,name,age,city,score,created_at +1,Alice,25,Tokyo,88,2025-10-01T09:00:00Z +2,Bob,30,Osaka,75,2025-10-02T09:30:00Z +3,Charlie,28,Nagoya,92,2025-10-03T10:00:00Z +4,David,35,Fukuoka,64,2025-10-04T11:15:00Z +5,Eva,22,Sapporo,80,2025-10-05T12:45:00Z +""" + gcs_provider.write_item("datasource-example-251018", + "example/y=2025/m=10/example.csv", + csv_data.encode("utf-8"),"text/csv") + + buckets = gcs_provider.get_buckets() + logger.info(f"Buckets: {buckets}") + + +def example_duckdb_cloud_raw(): + logger.info("Starting example_duckdb_cloud_raw function.") + + # DuckDB接続 + con = duckdb.connect() + con.sql(f""" + CREATE OR REPLACE SECRET gcs_creds ( + TYPE gcs, + KEY_ID {os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY')}, + SECRET {os.getenv('GCP_STORAGE_HMAC_SECRET_KEY')} + ); + """) + + query = f""" + SELECT * FROM read_csv_auto('gs://datasource-example-251018/example/y=2025/m=10/example.csv'); + """ + result = con.execute(query).df() + logger.info(f"Read {len(result)} rows from GCS file.") + + +def example_duckdb_cloud_class(): + logger.info("Starting example_duckdb_cloud_class function.") + # DuckDB接続 + provider = DuckDBProvider() + provider.setup_gcs( + access_key=os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY'), + secret_key=os.getenv('GCP_STORAGE_HMAC_SECRET_KEY'), + ) + bucket_name = "datasource-example-251018" + object_name = "example/y=2025/m=*/example.csv" + + query = f""" + SELECT * FROM {provider.get_gs_csv_name(bucket_name, object_name)}; + """ + result = provider.query_df(query) + logger.info(f"Read {len(result)} rows from GCS file using DuckDBProvider.") + + +# example_init_google_cloud_storage() +# example_duckdb_cloud_raw() +example_duckdb_cloud_class() \ No newline at end of file diff --git a/example/example_duck_model.py b/example/example_duck_model.py new file mode 100644 index 0000000..5a86c80 --- /dev/null +++ b/example/example_duck_model.py @@ -0,0 +1,30 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src"))) + +from typing import ClassVar, Optional + +from lib.custom_logger import get_logger +logger = get_logger(level=10) +from dataclasses import dataclass + +from models.duck_base_model import DuckBaseModel +from providers.duck_db_provider import DuckDBProvider + +@dataclass +class NewsModel(DuckBaseModel): + title: str + file_glob: ClassVar[str] = "./data_science/data/y=2025/news/news_*.csv" + default_order_by: ClassVar[Optional[str]] = "pub_date DESC" + + +def example_duckdb_model(): + logger.info("Starting example_duckdb_model function.") + provider = DuckDBProvider() + provider.connect() + NewsModel.provider = provider + + news = NewsModel.first() + logger.info(f"Total news: {news if news else 'No news found'}") + +# example_duckdb_model() \ No newline at end of file diff --git a/example/example_rss.py b/example/example_rss.py new file mode 100644 index 0000000..06a7c6f --- /dev/null +++ b/example/example_rss.py @@ -0,0 +1,47 @@ +# openai_rss_reader_stdlib.py +# pip install feedparser + +import feedparser +from email.utils import parsedate_to_datetime + +FEEDS = [ + "https://platform.openai.com/docs/release-notes.rss", # リリースノート + "https://openai.com/blog/rss.xml", # ブログ +] + +def to_ts(entry: feedparser.FeedParserDict): + # published_parsed / updated_parsed があれば直接datetime化 + if entry.get("published_parsed"): + return parsedate_to_datetime(entry.published) + if entry.get("updated_parsed"): + return parsedate_to_datetime(entry.updated) + return None + +def uniq_key(entry: feedparser.FeedParserDict): + return entry.get("id") or entry.get("guid") or entry.get("link") + +def fetch_all(feeds): + items = [] + seen = set() + for url in feeds: + d: feedparser.FeedParserDict = feedparser.parse(url) + for e in d.entries: + k = uniq_key(e) + if not k or k in seen: + continue + seen.add(k) + ts = to_ts(e) + items.append({ + "title": e.get("title", "(no title)"), + "link": e.get("link"), + "published": e.get("published") or e.get("updated"), + "ts": ts, + "source": url, + }) + # ts が None のものは末尾に回す + items.sort(key=lambda x: (x["ts"] is not None, x["ts"]), reverse=True) + return items + +if __name__ == "__main__": + for i, it in enumerate(fetch_all(FEEDS), 1): + print(f"{i:02d}. {it['title']}\n {it['link']}\n {it['published']} [{it['source']}]\n") diff --git a/requirements.txt b/requirements.txt index 7009d5e..b65db1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,13 +2,26 @@ matplotlib requests pyttsx3 - +# pylib +pandas # firebase_provider firebase-admin>=7.1.0 # google cloud storage google-cloud-storage +# google cloud big query +google-cloud-bigquery +# google cloud big query option +google-cloud-bigquery-storage +# bigquery to dataframe +db-dtypes # onedrive msal # common python-dotenv +# RSS +feedparser +# model +pydantic +# duckdb +duckdb==1.3.2 \ No newline at end of file diff --git a/src/lib/custom_logger.py b/src/lib/custom_logger.py index bde8c74..d80b104 100644 --- a/src/lib/custom_logger.py +++ b/src/lib/custom_logger.py @@ -28,7 +28,6 @@ class CustomLogger(Singleton): def __init__(self, name='main', log_file=None, level=logging.INFO): if hasattr(self, '_initialized') and self._initialized: - self.logger.setLevel(level) return # すでに初期化済みなら何もしない self.logger = logging.getLogger(name) diff --git a/src/lib/singleton.py b/src/lib/singleton.py index cf99cb7..c41efb6 100644 --- a/src/lib/singleton.py +++ b/src/lib/singleton.py @@ -2,7 +2,7 @@ This implementation is thread-safe and ensures that only one instance of the class is created. Singleton が提供するのは「同じインスタンスを返す仕組み」 -* __init__() は毎回呼ばれる(多くの人が意図しない動作) +* __init__() は毎回呼ばれる(多くの人が意図しない動作) * __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。 """ diff --git a/src/lib/time_schedule.py b/src/lib/time_schedule.py index 749a621..25134b9 100644 --- a/src/lib/time_schedule.py +++ b/src/lib/time_schedule.py @@ -14,7 +14,7 @@ class TaskScheduler: # インスタンスを作成し、スレッドを起動 scheduler = TaskScheduler() -# インスタンスを破棄(しかしスレッドは続行) +# インスタンスを破棄(しかしスレッドは続行) # del scheduler # メインスレッドは他の作業を続ける diff --git a/src/models/bigquery_base_model.py b/src/models/bigquery_base_model.py new file mode 100644 index 0000000..8d027e4 --- /dev/null +++ b/src/models/bigquery_base_model.py @@ -0,0 +1,510 @@ +import os +import base64 +from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar +from enum import Enum +from decimal import Decimal + + +from datetime import datetime, date, time, timezone +from dataclasses import dataclass, fields, is_dataclass + +import pandas as pd +from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider + +from lib.custom_logger import get_logger +logger = get_logger() + +T = TypeVar("T", bound="BigQueryBaseModel") + +Filter = Tuple[str, str, Any] + +def _build_where_clause_bq_without_prams( + where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]] +)->str: + """ + BigQuery 向け WHERE 句の生成(パラメータなし) + Returns: + where_sql + """ + if where is None: + return "" + + clauses = [] + + if isinstance(where, str): + # 文字列のまま(完全SQL指定) + return f"WHERE {where}" + + if isinstance(where, dict): + for col, val in where.items(): + if isinstance(val, (list, tuple, set)): + placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val]) + clauses.append(f"{col} IN ({placeholders})") + elif val is None: + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} = '{str(val).replace('\'', '\\\'')}'") + else: + # sequence of (col, op, val) + for col, op, val in where: + if (op.upper() == "IN") and isinstance(val, (list, tuple, set)): + placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val]) + clauses.append(f"{col} IN ({placeholders})") + elif val is None and op in ("=", "=="): + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} {op} '{str(val).replace('\'', '\\\'')}'") + + where_sql = "WHERE " + " AND ".join(clauses) + return where_sql + +def _build_where_clause_bq( + where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]] +) -> Tuple[str, List[Dict[str, Any]]]: + """ + BigQuery 向け WHERE 句とパラメータの生成 + Returns: + (where_sql, query_params) + """ + if where is None: + return "", [] + + params = [] + clauses = [] + + if isinstance(where, str): + # 文字列のまま(完全SQL指定) + return f"WHERE {where}", params + + if isinstance(where, dict): + for i, (col, val) in enumerate(where.items()): + param_name = f"param{i}" + if isinstance(val, (list, tuple, set)): + placeholders = [] + for j, v in enumerate(val): + pname = f"{param_name}_{j}" + placeholders.append(f"@{pname}") + params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}}) + clauses.append(f"{col} IN ({', '.join(placeholders)})") + elif val is None: + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} = @{param_name}") + params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}}) + else: + # sequence of (col, op, val) + for i, (col, op, val) in enumerate(where): + param_name = f"param{i}" + if (op.upper() == "IN") and isinstance(val, (list, tuple, set)): + placeholders = [] + for j, v in enumerate(val): + pname = f"{param_name}_{j}" + placeholders.append(f"@{pname}") + params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}}) + clauses.append(f"{col} IN ({', '.join(placeholders)})") + elif val is None and op in ("=", "=="): + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} {op} @{param_name}") + params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}}) + + where_sql = "WHERE " + " AND ".join(clauses) + return where_sql, params + + +def to_bq_json(v): + if isinstance(v, datetime): + # TIMESTAMP 用(UTCに正規化して Z を付ける) + if v.tzinfo is None: + v = v.replace(tzinfo=timezone.utc) + return v.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") + if isinstance(v, date) and not isinstance(v, datetime): + # DATE 用 + return v.isoformat() + if isinstance(v, time): + # TIME 用 + # v.isoformat() は microseconds を含むことあり → そのままでOK + return v.isoformat() + if isinstance(v, Decimal): + # NUMERIC/BIGNUMERIC に文字列で渡すのが安全 + return str(v) + if isinstance(v, bytes): + # BYTES は base64 文字列 + return base64.b64encode(v).decode("ascii") + if isinstance(v, Enum): + return v.value + if isinstance(v, list): + return [to_bq_json(x) for x in v] + if isinstance(v, dict): + return {k: to_bq_json(val) for k, val in v.items()} + return v + +@dataclass +class BigQueryBaseModel: + + table_id: ClassVar[str] # 例: "datasetid.table_name" + provider: ClassVar[GoogleCloudBigQueryProvider | None] = None + key_field: ClassVar[Optional[str]] = None # 例: "id" 主キーに相当するフィールド名 + columns_select: ClassVar[str] = "*" # 例: "col1, col2, col3" + default_order_by: ClassVar[Optional[str]] = None + + @classmethod + def set_provider(cls, provider: GoogleCloudBigQueryProvider): + cls.provider = provider + + @classmethod + def fetch_all(cls: Type[T]) -> List[T]: + """テーブル内の全レコードを取得する""" + if cls.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + rows = cls.provider.list_rows(table_id=cls.table_id) + result: List[T] = [] + for row in rows: + data = dict(row) + obj: T = cls(**data) # type: ignore[arg-type] + result.append(obj) + return result + + def create(self): + """レコードをテーブルに挿入する""" + if self.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + data = {f.name: getattr(self, f.name) for f in fields(self)} + data = to_bq_json(data) + + logger.debug(f"Inserting data: {data}") + self.provider.insert_rows( + table_id=self.table_id, + rows=[data] + ) + + @classmethod + def insert(cls, rows: List[T]): + """複数レコードをテーブルに挿入する""" + if cls.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + bq_rows = [] + for row in rows: + data = {f.name: getattr(row, f.name) for f in fields(row)} + bq_rows.append(to_bq_json(data)) + + logger.info(f"Inserting data: {len(bq_rows)}") + logger.debug(f"Inserting data details: {bq_rows[:10]}...") + cls.provider.insert_rows( + table_id=cls.table_id, + rows=bq_rows + ) + + @classmethod + def is_streaming_buffer(cls) -> bool: + """テーブルのストリームバッファ情報を取得する""" + if cls.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + buf_info = cls.provider.get_streaming_buffer_info(table_id=cls.table_id) + logger.debug(f"Streaming buffer info: {buf_info}") + + return buf_info is not None + + def save(self): + """レコードをテーブルに更新する""" + if self.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + if self.is_streaming_buffer(): + raise RuntimeError("Cannot update records in streaming buffer.") + + if self.key_field is None: + raise RuntimeError("key_field is not set.") + + key_value = getattr(self, self.key_field) + if key_value is None: + raise ValueError(f"Key field '{self.key_field}' value is None.") + + data = {f.name: getattr(self, f.name) for f in fields(self)} + data = to_bq_json(data) + + set_clauses = [] + for k, v in data.items(): + if k == self.key_field: + continue + if isinstance(v, str): + v_str = f"'{v.replace('\'', '\\\'')}'" + else: + v_str = str(v) + set_clauses.append(f"{k}={v_str}") + set_clause = ", ".join(set_clauses) + + where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'" + + logger.debug(f"Updating data: SET {set_clause} WHERE {where_clause}") + self.provider.update_query( + table_id=self.table_id, + values=set_clause, + were_clause=where_clause + ) + + def destroy(self): + """レコードをテーブルから削除する""" + if self.provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + if self.is_streaming_buffer(): + raise RuntimeError("Cannot delete records in streaming buffer.") + + if self.key_field is None: + raise RuntimeError("key_field is not set.") + + key_value = getattr(self, self.key_field) + if key_value is None: + raise ValueError(f"Key field '{self.key_field}' value is None.") + + where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'" + + logger.debug(f"Deleting data: WHERE {where_clause}") + self.provider.delete_query( + table_id=self.table_id, + were_clause=where_clause + ) + + # ------------------------------------- + # select + @classmethod + def select( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + ): + params_dict = [] + where_sql, params_dict = _build_where_clause_bq(where) + # where_sql = _build_where_clause_bq_without_prams(where) + order = order_by or cls.default_order_by + order_sql = f"ORDER BY {order}" if order else "" + limit_sql = f"LIMIT {int(limit)}" if limit is not None else "" + offset_sql = f"OFFSET {int(offset)}" if offset is not None else "" + sql = f""" + SELECT {cls.columns_select} + FROM `{cls.table_id}` + {where_sql} + {order_sql} + {limit_sql} + {offset_sql} + """ + # logger.debug(f"Select Query: {sql} with params_dict: {params_dict}") + if provider is None: + provider = cls.provider + if provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + params = [] + for p in params_dict: + param = provider.addQueryParameter( + name=p["name"], + value=p["parameterValue"]["value"], + param_type=p["parameterType"]["type"] + ) + params.append(param) + # パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用 + # query_job = provider.excute_query(sql, params=params) + query_job = provider.excute_query(sql, params=params) + return query_job + + + @classmethod + def select_df( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> pd.DataFrame: + """DataFrame で取得 + + Notes: + Google Cloud BigQuery から直接 DataFrame を取得するには db-dtypes パッケージが必要 + """ + query_job = cls.select( + provider=provider, + where=where, + order_by=order_by, + limit=limit, + offset=offset, + ) + return query_job.to_dataframe() + + @classmethod + def list( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> List[T]: + job = cls.select(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset) + results = job.result() + instances: List[T] = [] + for row in results: + data = dict(row) + instance = cls(**data) # type: ignore + instances.append(instance) + + # df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset) + # if not is_dataclass(cls): + # raise TypeError(f"{cls.__name__} is not a dataclass.") + # instances: List[T] = [] + # for _, row in df.iterrows(): + # data = {f.name: row[f.name] for f in fields(cls) if f.name in row} + # instance = cls(**data) # type: ignore + # instances.append(instance) + return instances + + @classmethod + def first( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + ) -> Optional[T]: + results = cls.list(provider=provider, where=where, order_by=order_by, limit=1) + return results[0] if results else None + + @classmethod + def count( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + ) -> int: + where_sql, params_dict = _build_where_clause_bq(where) + sql = f""" + SELECT COUNT(*) as cnt + FROM `{cls.table_id}` + {where_sql} + """ + # パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用 + if provider is None: + provider = cls.provider + if provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + params = [] + for p in params_dict: + param = provider.addQueryParameter( + name=p["name"], + value=p["parameterValue"]["value"], + param_type=p["parameterType"]["type"] + ) + params.append(param) + + provider.excute_query( + sql, + params=params + ) + + query_job = provider.excute_query(sql, params=params) + results = query_job.result() + for row in results: + return row["cnt"] + return 0 + + @classmethod + def exists( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + ) -> bool: + if provider is None: + provider = cls.provider + return cls.count(provider, where=where) > 0 + + @classmethod + def delete( + cls: Type[T], + provider: GoogleCloudBigQueryProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + ): + where_sql, params_dict = _build_where_clause_bq(where) + sql = f""" + DELETE FROM `{cls.table_id}` + {where_sql} + """ + if provider is None: + provider = cls.provider + if provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + params = [] + for p in params_dict: + param = provider.addQueryParameter( + name=p["name"], + value=p["parameterValue"]["value"], + param_type=p["parameterType"]["type"] + ) + params.append(param) + + job = provider.excute_query( + sql, + params=params + ) + job.result() # ジョブの完了を待機 + + @classmethod + def update( + cls: Type[T], + values: dict[str, Any], + *, + where: Union[str, Dict[str, Any], Sequence[Filter]] , + provider: GoogleCloudBigQueryProvider = None, + ): + params_dicts = [] + where_sql, params_dicts = _build_where_clause_bq(where) + set_clauses = [] + for k, v in values.items(): + param_name = f"set_{k}" + set_clauses.append(f"{k}=@{param_name}") + params_dicts.append({ + "name": param_name, + "parameterType": {"type": "STRING"}, + "parameterValue": {"value": str(v)} + }) + + sql = f""" + UPDATE `{cls.table_id}` + SET {', '.join(set_clauses)} + {where_sql} + """ + logger.debug(f"update Query: {sql} with params_dict: {params_dicts}") + + if provider is None: + provider = cls.provider + if provider is None: + raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().") + + params = [] + for p in params_dicts: + param = provider.addQueryParameter( + name=p["name"], + value=p["parameterValue"]["value"], + param_type=p["parameterType"]["type"] + ) + params.append(param) + + job = provider.excute_query( + sql, + params=params + ) + job.result() # ジョブの完了を待機 \ No newline at end of file diff --git a/src/models/duck_base_model.py b/src/models/duck_base_model.py new file mode 100644 index 0000000..960d864 --- /dev/null +++ b/src/models/duck_base_model.py @@ -0,0 +1,184 @@ +import os +from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar +from dataclasses import dataclass, fields, is_dataclass + +import pandas as pd +from providers.duck_db_provider import DuckDBProvider + +DATA_BASEPATH=os.getenv("DATA_BASEPATH","./data") +from lib.custom_logger import get_logger +logger = get_logger() + +T = TypeVar("T", bound="DuckBaseModel") + +Filter = Tuple[str, str, Any] + +def _build_where_clause(where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] + ) -> Tuple[str, List[Any]]: + if where is None: + return "", [] + params: List[Any] = [] + + if isinstance(where, str): + return f"WHERE {where}", params + + clauses: List[str] = [] + if isinstance(where, dict): + for col, val in where.items(): + if isinstance(val, (list, tuple, set)): + placeholders = ", ".join(["?"] * len(val)) + clauses.append(f"{col} IN ({placeholders})") + params.extend(list(val)) + elif val is None: + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} = ?") + params.append(val) + else: + # sequence of (col, op, value) + for col, op, val in where: + if (op.upper() == "IN") and isinstance(val, (list, tuple, set)): + placeholders = ", ".join(["?"] * len(val)) + clauses.append(f"{col} IN ({placeholders})") + params.extend(list(val)) + elif val is None and op in ("=", "=="): + clauses.append(f"{col} IS NULL") + else: + clauses.append(f"{col} {op} ?") + params.append(val) + + if not clauses: + return "", [] + return "WHERE " + " AND ".join(clauses), params + + +@dataclass +class DuckBaseModel: + file_glob: ClassVar[str] # 例: "./data_science/data/y=2025/news/news_*.csv" + provider: ClassVar[DuckDBProvider | None] = None + columns_select: ClassVar[str] = "*" # 例: "published_parsed, title, link" + default_order_by: ClassVar[Optional[str]] = None + + # read_csv_auto オプション(必要ならサブクラスで上書き) + hive_partitioning: ClassVar[bool] = True + union_by_name: ClassVar[bool] = True + header: ClassVar[bool] = True + ignore_errors: ClassVar[bool] = True + + + @classmethod + def set_provider(cls, provider: DuckDBProvider): + cls.provider = provider + + + + + # ---- FROM 句(CSV読込) ---- + @classmethod + def _from_clause(cls,file_glob=None) -> str: + path = file_glob or cls.file_glob + path = path.replace("'", "''") + path =f"{DATA_BASEPATH}{path}" + + # path = cls.file_glob.replace("'", "''") + hp = 1 if cls.hive_partitioning else 0 + un = 1 if cls.union_by_name else 0 + hd = "TRUE" if cls.header else "FALSE" + ig = "TRUE" if cls.ignore_errors else "FALSE" + return (f"read_csv_auto('{path}', HEADER={hd}, IGNORE_ERRORS={ig}, " + f"hive_partitioning={hp}, union_by_name={un})") + + @classmethod + def select_df( + cls: Type[T], + provider: DuckDBProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + file_glob=None, + ) -> List[T]: + where_sql, params = _build_where_clause(where) + order = order_by or cls.default_order_by + order_sql = f"ORDER BY {order}" if order else "" + limit_sql = f"LIMIT {int(limit)}" if limit is not None else "" + offset_sql = f"OFFSET {int(offset)}" if offset is not None else "" + sql = f""" + SELECT {cls.columns_select} + FROM {cls._from_clause(file_glob=file_glob)} + {where_sql} + {order_sql} + {limit_sql} + {offset_sql} + """ + # パラメータバインド:DuckDBProvider の生 connection を使用 + if provider is None: + provider = cls.provider + + cur = provider.con.execute(sql, params) + return cur.df() + + @classmethod + def list( + cls: Type[T], + provider: DuckDBProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + file_glob=None, + ) -> List[T]: + df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset,file_glob=file_glob) + if not is_dataclass(cls): + raise TypeError(f"{cls.__name__} is not a dataclass.") + instances: List[T] = [] + for _, row in df.iterrows(): + data = {f.name: row[f.name] for f in fields(cls) if f.name in row} + instance = cls(**data) # type: ignore + instances.append(instance) + return instances + + @classmethod + def first( + cls: Type[T], + provider: DuckDBProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + order_by: Optional[str] = None, + ) -> Optional[T]: + results = cls.list(provider=provider, where=where, order_by=order_by, limit=1) + return results[0] if results else None + + @classmethod + def count( + cls: Type[T], + provider: DuckDBProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + ) -> int: + where_sql, params = _build_where_clause(where) + sql = f""" + SELECT COUNT(*) AS cnt + FROM {cls._from_clause()} + {where_sql} + """ + # パラメータバインド:DuckDBProvider の生 connection を使用 + if provider is None: + provider = cls.provider + + cur = provider.con.execute(sql, params) + row = cur.fetchone() + return row["cnt"] if row else 0 + + @classmethod + def exists( + cls: Type[T], + provider: DuckDBProvider = None, + *, + where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None, + ) -> bool: + if provider is None: + provider = cls.provider + return cls.count(provider, where=where) > 0 \ No newline at end of file diff --git a/src/models/firestore_base_model.py b/src/models/firestore_base_model.py index 2415637..5782efc 100644 --- a/src/models/firestore_base_model.py +++ b/src/models/firestore_base_model.py @@ -45,7 +45,7 @@ class FirestoreBaseModel: return cls._col().document(doc_id) def _as_mutable_dict(self) -> Dict[str, Any]: - """dataclass → dict(id と None は必要に応じて処理)""" + """dataclass → dict(id と None は必要に応じて処理)""" data = asdict(self) data.pop("_doc_id", None) data.pop("collection_name", None) @@ -54,7 +54,7 @@ class FirestoreBaseModel: @classmethod def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T: - """dict -> モデル(data内の'id'を拾う)""" + """dict -> モデル(data内の'id'を拾う)""" data = dict(data) _id = data.pop("id", None) obj: T = cls(**data) # type: ignore[arg-type] @@ -88,7 +88,7 @@ class FirestoreBaseModel: return self._doc_id def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None: - """部分更新(set(merge=True)の糖衣)""" + """部分更新(set(merge=True)の糖衣)""" if not self._doc_id: raise ValueError("Cannot update without id.") payload = dict(changes) @@ -205,7 +205,7 @@ class FirestoreBaseModel: ) -> int: """ 条件でヒットしたドキュメントを一括削除。削除件数を返す。 - ※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う) + ※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う) """ from providers.firestore_provider import FireStoreProvider rows = FireStoreProvider.list_documents( diff --git a/src/providers/duck_db_provider.py b/src/providers/duck_db_provider.py new file mode 100644 index 0000000..755f2e1 --- /dev/null +++ b/src/providers/duck_db_provider.py @@ -0,0 +1,73 @@ +import duckdb + + +class DuckDBProvider: + + def __init__(self, db_path: str = ":memory:", read_only: bool = False): + self.con = self.connect(db_path, read_only) + + def connect(self, db_path: str = ":memory:", read_only: bool = False): + return duckdb.connect(database=db_path, read_only=read_only) + + def setup_gcs(self, access_key: str,secret_key: str): + """GCSのシークレットを設定する""" + if not self.con: + raise ValueError("DuckDB is not connected.") + self.con.sql(f""" + CREATE OR REPLACE SECRET gcs_creds ( + TYPE gcs, + KEY_ID '{access_key}', + SECRET '{secret_key}' + ); + """) + + + def close(self): + """接続を閉じる""" + if self.con: + self.con.close() + + def query_df(self, sql: str): + """SQLクエリを実行してDataFrameで返す""" + return self.con.execute(sql).df() + + def max_value( + self, + file_path: str, + column: str, + hive_partitioning: bool = True, + union_by_name: bool = True, + ) -> any: + """CSVファイルの指定列の最大値を取得する""" + query = f""" + SELECT MAX({column}) AS max_{column} + FROM read_csv_auto('{file_path}', + hive_partitioning={1 if hive_partitioning else 0}, + union_by_name={1 if union_by_name else 0} + ) + """ + result = self.con.execute(query).fetchone()[0] + return result + + def load_file(self, file_glob: str, table: str): + """CSVを読み込みテーブル化""" + sql = f""" + CREATE OR REPLACE TABLE {table} AS + SELECT * + FROM read_csv_auto('{file_glob}', HEADER=TRUE, IGNORE_ERRORS=TRUE) + """ + self.con.execute(sql) + + @staticmethod + def get_gs_csv_name( + backet: str, + object_name: str, + hive_partitioning: bool = True, + union_by_name: bool = True, + ) -> str: + + return f"""read_csv_auto('gs://{backet}/{object_name}', + hive_partitioning={1 if hive_partitioning else 0}, + union_by_name={1 if union_by_name else 0} + ) + """ \ No newline at end of file diff --git a/src/providers/firestore_provider.py b/src/providers/firestore_provider.py index b9e3883..35fcbbd 100644 --- a/src/providers/firestore_provider.py +++ b/src/providers/firestore_provider.py @@ -78,12 +78,12 @@ class FireStoreProvider(): start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """ - コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応) + コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応) Args: collection_name: コレクション名 filters: [(field, op, value), ...] - order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順) + order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順) limit: 取得件数の上限 start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書 @@ -189,13 +189,13 @@ class FireStoreProvider(): merge: bool = True, ) -> None: """ - ドキュメントを更新。merge=True なら部分更新(推奨)。 + ドキュメントを更新。merge=True なら部分更新(推奨)。 """ ref = cls.get_doc_ref(collection_name, doc_id) if merge: ref.set(data, merge=True) else: - # 全置換(存在しないと作成される) + # 全置換(存在しないと作成される) ref.set(data, merge=False) logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})") diff --git a/src/providers/google_cloud_bigquery_provider.py b/src/providers/google_cloud_bigquery_provider.py new file mode 100644 index 0000000..0357cfe --- /dev/null +++ b/src/providers/google_cloud_bigquery_provider.py @@ -0,0 +1,662 @@ +import os +import time +from datetime import datetime +from typing import Optional, Dict, Union,Sequence +from enum import Enum + +from google.cloud import bigquery +from google.cloud.bigquery.table import RowIterator, TableListItem +from google.cloud.bigquery.dataset import DatasetListItem +from google.cloud.bigquery.table import StreamingBuffer + +from google.oauth2 import service_account + +from lib.custom_logger import get_logger +logger = get_logger() + +class BigQueryFieldType(str,Enum): + STRING = "STRING" + BYTES = "BYTES" + INTEGER = "INTEGER" + FLOAT = "FLOAT" + BOOLEAN = "BOOLEAN" + TIMESTAMP = "TIMESTAMP" + DATE = "DATE" + TIME = "TIME" + DATETIME = "DATETIME" + RECORD = "RECORD" + NUMERIC = "NUMERIC" + BIGNUMERIC = "BIGNUMERIC" + GEOGRAPHY = "GEOGRAPHY" + JSON = "JSON" + +class BogQueryFieldMode(str,Enum): + NULLABLE = "NULLABLE" # NULL許可 + REQUIRED = "REQUIRED" # 必須 + REPEATED = "REPEATED" # 配列 + + +class GoogleCloudBigQueryProvider: + """Cloud BigQuery操作プロバイダ + + Notes: + - このクラスはGoogle Cloud BigQueryの基本的な操作を提供します。 + - 認証にはサービスアカウントキーまたはApplication Default Credentials(ADC)を使用します。 + - BigQuery Storage Read APIを使用するなら別のライブラリを使用してください(高速読み込み) + - 必要な権限 + - データセット操作を行う場合はBigQuery データオーナーの権限が必要です。 + - bigquery.dataViewer / bigquery.dataEditor / bigquery.readSessionUserなど部分でも可能 + - JOB操作を行う場合はBigQuery ジョブユーザーの権限が必要です。 + - フル機能を使う場合はBigQuery データオーナー及びBigQuery ジョブユーザーの権限が必要です。 + """ + + def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None): + try: + if cred_path: + creds = service_account.Credentials.from_service_account_file(cred_path) + # プロジェクト未指定なら credentials から取得 + effective_project = project or creds.project_id + self._client = bigquery.Client( + project=effective_project, credentials=creds + ) + logger.info(f"Bigquery client initialized with service account file. project={effective_project}") + elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"): + cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") + creds = service_account.Credentials.from_service_account_info(cred_json) + effective_project = project or creds.project_id + self._client = bigquery.Client( + project=effective_project, credentials=creds + ) + logger.info("Bigquery client initialized with credentials from environment variable.") + else: + self._client = bigquery.Client(project=project) + logger.info("Bigquery client initialized with default credentials (ADC).") + except Exception as e: + logger.error(f"Bigquery initialization failed: {e}") + raise + + def get_datasets(self)-> list[DatasetListItem]: + """データセット一覧を取得する""" + try: + datasets = list(self._client.list_datasets()) + return datasets + except Exception as e: + logger.error(f"Failed to list datasets: {e}") + raise + + def create_dataset( + self, dataset_id: str, + location: Optional[str] ="asia-northeast1") -> bigquery.Dataset: + """ + データセットを作成する + + Args: + dataset_id (str): 作成するデータセットID + location (Optional[str]): データセットのロケーション + Returns: + bigquery.Dataset: 作成されたデータセットオブジェクト + Notes: + - 既に存在するデータセットIDを指定した場合、例外が発生します。 + - 必要な権限 + - bigquery.datasets.create + - 命名規則 + - 英小文字 (a-z)、数字 (0-9)、およびアンダースコア (_) のみ使用可能 + """ + try: + dataset_ref = self._client.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + if location: + dataset.location = location + dataset = self._client.create_dataset(dataset) + logger.info(f"Dataset {dataset_id} created.") + return dataset + except Exception as e: + logger.error(f"Failed to create dataset {dataset_id}: {e}") + raise + + def is_exists_dataset(self, dataset_id: str) -> bool: + """データセットが存在するか確認する""" + try: + self._client.get_dataset(dataset_id) + return True + except Exception: + return False + + def delete_dataset(self, dataset_id: str, delete_contents: bool = False): + """ + データセットを削除する + + Args: + dataset_id (str): 削除するデータセットID + delete_contents (bool): データセット内のテーブルも削除する場合はTrue + Notes: + - 必要な権限 + - bigquery.datasets.delete + - delete_contents=Falseの場合、データセット内にテーブルが存在すると削除に失敗します。 + - その場合はdelete_contents=Trueを指定してください。 + - `google.api_core.exceptions.Conflict`が発生します。 + """ + try: + self._client.delete_dataset(dataset_id, delete_contents=delete_contents) + logger.info(f"Dataset {dataset_id} deleted.") + except Exception as e: + logger.error(f"Failed to delete dataset {dataset_id}: {e}") + raise + + + def full_table_id(self, dataset_id: str, table_name: str) -> str: + """テーブルIDを構築する""" + project = self._client.project + return f"{project}.{dataset_id}.{table_name}" + + def is_exists_table(self,table_id: str) -> bool: + """テーブルが存在するか確認する""" + try: + self._client.get_table(table_id) + return True + except Exception: + return False + + @staticmethod + def addSchemaField( + name:str, + field_type:Union[str, BigQueryFieldType], + field_mode:Union[str, BogQueryFieldMode]=BogQueryFieldMode.NULLABLE , + default_value_expression:Optional[str]=None, + description:Optional[str]=None + )-> bigquery.SchemaField: + if isinstance(field_type, BigQueryFieldType): + field_type = field_type.value + elif isinstance(field_type, str): + field_type = field_type.upper() + + if isinstance(field_mode, BogQueryFieldMode): + field_mode = field_mode.value + elif isinstance(field_mode, str): + field_mode = field_mode.upper() + + return bigquery.SchemaField( + name=name, + field_type=field_type, + mode=field_mode , + default_value_expression=default_value_expression, + description=description) + + def create_table( + self, + table_id: str, + schema: list[bigquery.SchemaField]) -> bigquery.Table: + """ + テーブルを作成する + + Args: + table_id (str): 作成するテーブルID(例: 'dataset.table') + schema (list[bigquery.SchemaField]): テーブルのスキーマ定義 + Returns: + bigquery.Table: 作成されたテーブルオブジェクト + Notes: + - 既に存在するテーブルIDを指定した場合、例外が発生します。 + """ + try: + table = bigquery.Table(table_id, schema=schema) + table = self._client.create_table(table) + logger.info(f"Table {table_id} created.") + return table + except Exception as e: + logger.error(f"Failed to create table {table_id}: {e}") + raise + + + def delete_table(self, table_id: str, not_found_ok: bool = False): + """ + テーブルを削除する + + Args: + table_id (str): 削除するテーブルID(例: 'dataset.table') + not_found_ok (bool): テーブルが存在しない場合でもエラーにしない場合はTrue + """ + try: + self._client.delete_table(table_id, not_found_ok=not_found_ok) + logger.info(f"Table {table_id} deleted.") + except Exception as e: + logger.error(f"Failed to delete table {table_id}: {e}") + raise + + def get_tables(self, dataset_id: str) -> list[TableListItem]: + """ + データセット内のテーブル一覧を取得する + + Args: + dataset_id (str): 対象のデータセットID + Returns: + list[bigquery.TableListItem]: テーブル一覧 + """ + try: + tables = list(self._client.list_tables(dataset_id)) + logger.info(f"Tables listed successfully from dataset {dataset_id}, count {len(tables)}.") + return tables + except Exception as e: + logger.error(f"Failed to list tables in dataset {dataset_id}: {e}") + raise + + def insert_rows( + self, + table_id: str, + rows: list[Dict[str, any]]) -> Sequence[dict]: + """ + テーブルに行を挿入する + + Args: + table_id (str): 挿入先のテーブルID(例: 'dataset.table') + rows (list[Dict[str, any]]): 挿入する行データのリスト + Returns: + Sequence[dict]: 挿入結果のレスポンスオブジェクト + Notes: + - rowsは辞書形式で指定します。キーがカラム名、値がカラムの値となります。 + - 例: [{"column1": "value1", "column2": 123}, {"column1": "value2", "column2": 456}] + """ + try: + errors = self._client.insert_rows_json(table_id, rows) + if errors: + logger.error(f"Errors occurred while inserting rows into {table_id}: {errors}") + else: + logger.info(f"Rows inserted successfully into {table_id}.") + return errors + except Exception as e: + logger.error(f"Failed to insert rows into table {table_id}: {e}") + raise + + def list_rows( + self, + table_id: str, + selected_fields:Optional[list[str]] = None, + max_results:Optional[int] = None, + start_index:Optional[int] = None + ) -> RowIterator: + """ + テーブルの行を一覧取得する + Args: + table_id (str): 対象のテーブルID(例: 'dataset.table') + selected_fields (Optional[list[str]]): 取得するカラム名のリスト。Noneの場合は全カラム取得 + max_results (Optional[int]): 取得する行数の上限。Noneの場合は制限なし + start_index (Optional[int]): 取得開始インデックス。Noneの場合は先頭から取得 + Returns: + RowIterator: 取得した行のイテレータ + """ + try: + table = self._client.get_table(table_id) + _selected_fields:Optional[list[bigquery.SchemaField]] = None + if selected_fields: + _selected_fields = [field for field in table.schema if field.name in selected_fields] + + rows = self._client.list_rows( + table, + selected_fields=_selected_fields, + max_results=max_results, + start_index=start_index + ) + logger.info(f"Rows listed successfully from table {table_id}.") + return rows + except Exception as e: + logger.error(f"Failed to list rows from table {table_id}: {e}") + raise + + def list_rows_to_dataframe( + self, + table_id: str, + selected_fields:Optional[list[str]] = None, + max_results:Optional[int] = None, + start_index:Optional[int] = None + ): + try: + table = self._client.get_table(table_id) + df = self._client.list_rows( + table, + selected_fields=selected_fields, + max_results=max_results, + start_index=start_index + ).to_dataframe() + logger.info(f"Rows listed to DataFrame successfully from table {table_id}.") + return df + except Exception as e: + logger.error(f"Failed to list rows to DataFrame from table {table_id}: {e}") + raise + + # -------------------------------------------------------- + def get_streaming_buffer_info(self,table_id: str): + """スリーミングバッファを取得する + + Notes: + - スリーミングバッファは、ストリーミング挿入によってテーブルに追加されたデータが永続的なストレージに書き込まれる前に一時的に保存される場所です。 + - スリーミングバッファの情報が存在しない場合、Noneを返します + - ストリーミングバッファ内のデータを直接削除・更新・クリアすることは不可能です。 + - GCP側で完全に自動制御されており、手動でクリア/フラッシュ/削除する手段は 存在しません。 + - 通常:数秒〜数分 高頻度ストリーミング:〜30分 極端な負荷/多パーティション:最大約90分(GCP上限目安) + """ + t = self._client.get_table(table_id) + # SDK属性 or 下位プロパティのどちらかに入っています + buf:StreamingBuffer = getattr(t, "streaming_buffer", None) or t._properties.get("streamingBuffer") + if not buf: + return None + # 例: {'estimatedRows': '123', 'estimatedBytes': '456789', 'oldestEntryTime': '1698234000000'} + logger.debug(f"Streaming Buffer Raw Info: {buf}") + return { + "estimated_rows": buf.estimated_rows, + "estimated_bytes": buf.estimated_bytes, + "oldest_entry_ms": buf.oldest_entry_time, + "raw": buf, + } + + def excute_query( + self, + query: str, + params:list = None) -> bigquery.QueryJob: + """ + クエリを実行する + + Args: + query (str): 実行するSQLクエリ + Returns: + bigquery.QueryJob: 実行結果のQueryJobオブジェクト + """ + try: + if params and len(params) >0: + query_job = self._client.query(query, job_config=bigquery.QueryJobConfig( + query_parameters=params + )) + else: + query_job = self._client.query(query) + logger.info("Query executed successfully.") + logger.debug(f"Query JOB: {query_job}") + return query_job + except Exception as e: + logger.error(f"Failed to execute query: {e}") + raise + + + + @staticmethod + def addQueryParameter( + name:str, + value: any, + param_type:Union[str, BigQueryFieldType]=BigQueryFieldType.STRING + ) -> bigquery.ScalarQueryParameter: + """クエリパラメータを作成する""" + if isinstance(param_type, BigQueryFieldType): + param_type = param_type.value + elif isinstance(param_type, str): + param_type = param_type.upper() + + return bigquery.ScalarQueryParameter( + name=name, + type_=param_type, + value=value + ) + + def select_query( + self, + table_id: str, + columns: list[str] = None, + were_clause: str = None, + params: list = None, + order_by: str = None, + limit: int = None + ): + """ + 指定したテーブルからデータを選択するクエリを実行する + + Args: + table_id (str): 対象のテーブルID(例: 'dataset.table') + columns (list[str]): 取得するカラム名のリスト。Noneの場合は全カラム取得 + were_clause (str): WHERE句の条件式。Noneの場合は条件なし + order_by (str): 並び替えのカラム名。Noneの場合は並び替えなし + limit (int): 取得する行数の上限。Noneの場合は制限なし + """ + try: + cols = ", ".join(columns) if columns else "*" + query = f"SELECT {cols} FROM `{table_id}`" + if were_clause: + query += f" WHERE {were_clause}" + if order_by: + query += f" ORDER BY {order_by}" + if limit: + query += f" LIMIT {limit}" + query_job = self.excute_query(query, params=params) + logger.info("Select query executed successfully.") + + return query_job + except Exception as e: + logger.error(f"Failed to execute select query on table {table_id}: {e}") + raise + + def select_query_to_dataframe( + self, + table_id: str, + columns: list[str] = None, + were_clause: str = None, + params: list = None, + order_by: str = None, + limit: int = None + ): + try: + query_job= self.select_query( + table_id=table_id, + columns=columns, + were_clause=were_clause, + params=params, + order_by=order_by, + limit=limit + ) + df = query_job.to_dataframe() + logger.info("Select query to DataFrame executed successfully.") + return df + except Exception as e: + logger.error(f"Failed to convert query result to DataFrame for table {table_id}: {e}") + raise + + def select_query_to_dict( + self, + table_id: str, + columns: list[str] = None, + were_clause: str = None, + params: list = None, + order_by: str = None, + limit: int = None + ) -> list[Dict[str, any]]: + try: + query_job= self.select_query( + table_id=table_id, + columns=columns, + were_clause=were_clause, + params=params, + order_by=order_by, + limit=limit + ) + results = query_job.result() + dicts = [dict(row) for row in results] + logger.info("Select query to dicts executed successfully.") + return dicts + except Exception as e: + logger.error(f"Failed to convert query result to dicts for table {table_id}: {e}") + raise + + def isert_query( + self, + table_id: str, + values: dict[str, str], + params: list = None + ) -> bigquery.QueryJob: + """ + 挿入クエリ(INSERT)を実行する + """ + try: + query = f"INSERT INTO `{table_id}` ({', '.join(values.keys())})" + query += f" VALUES ({', '.join([str(v) for v in values.values()])})" + query_job = self.excute_query(query, params=params) + logger.info("Insert query executed successfully.") + return query_job + except Exception as e: + logger.error(f"Failed to execute insert query on table {table_id}: {e}") + raise + + def update_query( + self, + table_id: str, + values: dict[str, str], + were_clause: str, + params: list = None + ) -> bigquery.QueryJob: + """ + 更新クエリ(UPDATE)を実行する + + Args: + table_id (str): 対象のテーブルID(例: 'dataset.table') + values (dict[str, str]): 更新するカラムと値の辞書 + were_clause (str): WHERE句の条件式。全件の場合は"TRUE"を指定 + """ + try: + query = f"UPDATE `{table_id}`" + query += f" SET {', '.join([f'{k} = {v}' for k, v in values.items()])}" + if were_clause: + query += f" WHERE {were_clause}" + logger.debug(f"Update Query: {query}") + query_job = self.excute_query(query, params=params) + logger.info("Update query executed successfully.") + return query_job.result() + except Exception as e: + logger.error(f"Failed to execute update query on table {table_id}: {e}") + raise + + def delete_query( + self, + table_id: str, + were_clause:str, + params: list = None + ) -> bigquery.QueryJob: + """ + 削除クエリ(DELETE)を実行する + """ + try: + query = f"DELETE FROM `{table_id}`" + if were_clause: + query += f" WHERE {were_clause}" + query_job = self.excute_query(query, params=params) + logger.info("Delete query executed successfully.") + return query_job.result() + except Exception as e: + logger.error(f"Failed to execute delete query on table {table_id}: {e}") + raise + + def upsert_query( + self, + table_id: str, + insert_values: dict[str, str], + update_values: dict[str, str], + key_columns: list[str], + params: list = None + ) -> bigquery.QueryJob: + """ + UPSERTクエリ(MERGE)を実行する + """ + try: + query = f"MERGE INTO `{table_id}` T" + query += f" USING (SELECT {', '.join([str(v) for v in insert_values.values()])}) S" + query += f" ON {' AND '.join([f'T.{col} = S.{col}' for col in key_columns])}" + query += " WHEN MATCHED THEN" + query += f" UPDATE SET {', '.join([f'T.{k} = {v}' for k, v in update_values.items()])}" + query += " WHEN NOT MATCHED THEN" + query += f" INSERT ({', '.join(insert_values.keys())})" + query += f" VALUES ({', '.join([str(v) for v in insert_values.values()])})" + query_job = self.excute_query(query, params=params) + logger.info("Upsert query executed successfully.") + return query_job + except Exception as e: + logger.error(f"Failed to execute upsert query on table {table_id}: {e}") + raise + + def import_csv( + self, + table_id: str, + csv_file_path: str, + skip_leading_rows: int = 1, + write_disposition: str = "WRITE_APPEND", + field_delimiter: str = ",", + autodetect: bool = True, + schema: list[bigquery.SchemaField] = None + ) -> bigquery.LoadJob: + """ + CSVファイルからテーブルにデータを挿入する + + Args: + table_id (str): 挿入先のテーブルID(例: 'dataset.table') + csv_file_path (str): 挿入元のCSVファイルパス GCSも対応可能(gs://bucket/path/to/file.csv) + skip_leading_rows (int): ヘッダ行をスキップする行数 + write_disposition (str): 書き込みモード (例: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY") + field_delimiter (str): フィールド区切り文字 + autodetect (bool): スキーマ自動検出を有効にするかどうか + schema (list[bigquery.SchemaField]): スキーマ定義。autodetect=Falseの場合に必要 + Returns: + bigquery.LoadJob: ロードジョブオブジェクト + """ + try: + job_config = bigquery.LoadJobConfig( + skip_leading_rows=skip_leading_rows, + write_disposition=write_disposition, + field_delimiter=field_delimiter, + autodetect=autodetect, + schema=schema + ) + with open(csv_file_path, "rb") as source_file: + load_job = self._client.load_table_from_file( + source_file, + table_id, + job_config=job_config + ) + load_job.result() # ジョブの完了を待機 + logger.info(f"Data loaded successfully from {csv_file_path} to table {table_id}.") + return load_job + except Exception as e: + logger.error(f"Failed to load data from {csv_file_path} to table {table_id}: {e}") + raise + + + + def export_csv_to_gcs( + self, + table_id: str, + bucket: str, + prefix: str, + csv_file_name: str =None, + location: str = None + ) -> bigquery.ExtractJob: + """ + テーブルからGCSのCSVファイルにデータをエクスポートする + + Args: + table_id (str): エクスポート元のテーブルID(例: 'dataset.table') + bucket (str): エクスポート先のGCSバケット名 + prefix (str): エクスポート先のGCSプレフィックス(フォルダパス) + csv_file_name (str): エクスポート先のCSVファイル名。Noneの場合は自動生成 + location (str): ジョブのロケーション + Returns: + bigquery.ExtractJob: エクストラクトジョブオブジェクト + """ + try: + if not csv_file_name: + _timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + csv_file_name = f"{table_id.replace('.','_')}_{_timestamp}.csv" + + destination_uri = f"gs://{bucket}/{prefix}/{csv_file_name}" + + extract_job = self._client.extract_table( + table_id, + destination_uri, + location=location + ) + extract_job.result() # ジョブの完了を待機 + logger.info(f"Data exported successfully from table {table_id} to {destination_uri}.") + return extract_job + except Exception as e: + logger.error(f"Failed to export data from table {table_id} to {destination_uri}: {e}") + raise + diff --git a/src/providers/google_cloud_pubsub_provider.py b/src/providers/google_cloud_pubsub_provider.py new file mode 100644 index 0000000..e69de29 diff --git a/src/providers/google_cloud_storage_provider.py b/src/providers/google_cloud_storage_provider.py index 29fee49..07935a0 100644 --- a/src/providers/google_cloud_storage_provider.py +++ b/src/providers/google_cloud_storage_provider.py @@ -10,6 +10,9 @@ from google.oauth2 import service_account from lib.custom_logger import get_logger logger = get_logger() +import zipfile +from pathlib import Path + class GoogleCloudStorageProvider: @@ -75,6 +78,17 @@ class GoogleCloudStorageProvider: def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str], content_type: str | None = None) -> Dict[str, Any]: + """ + オブジェクトを書き込む + + Args: + bucket (str): バケット名 + object_name (str): オブジェクト名 + data (Union[bytes, BinaryIO, str]): 書き込むデータ + content_type (Optional[str]): コンテンツタイプ(MIMEタイプ) + Returns: + Dict[str, Any]: 書き込んだオブジェクトの情報 + """ blob = self._blob(bucket, object_name) if content_type is None: content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream" @@ -100,4 +114,57 @@ class GoogleCloudStorageProvider: def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET", expires: timedelta = timedelta(hours=1)) -> str: - return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method) \ No newline at end of file + return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method) + + def zip_items( + self, + bucket: str, + object_names: List[str], + ) -> bytes: + """ + 複数のGCSオブジェクトを1つのZIPにまとめ、ZIPバイナリ(bytes)を返す + + Args: + bucket (str): バケット名 + object_names (List[str]): 対象オブジェクトのリスト + Returns: + bytes: ZIPファイルのバイナリ + """ + out = io.BytesIO() + with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + for obj in object_names: + blob = self._blob(bucket, obj) + if not blob.exists(): + raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}") + + buf = io.BytesIO() + blob.download_to_file(buf) + buf.seek(0) + arcname = Path(obj).name + zf.writestr(arcname, buf.read()) + + zf.comment = f"bucket={bucket}, files={len(object_names)}".encode() + + return out.getvalue() + + def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""): + """ + ローカルフォルダをGCSに再帰的にアップロードする + + Args: + bucket (str): バケット名 + folder_path (str): ローカルフォルダのパス + gcs_prefix (str): GCS上のプレフィックス(フォルダパス) + """ + _bucket = self._bucket(bucket) + + for root, _, files in os.walk(folder_path): + for file in files: + local_file_path = os.path.join(root, file) + # フォルダ構造を保つように相対パスを生成 + relative_path = os.path.relpath(local_file_path, folder_path) + gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/") + + blob = _bucket.blob(gcs_object_name) + blob.upload_from_filename(local_file_path) + logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}") \ No newline at end of file diff --git a/src/providers/one_drive_provider.py b/src/providers/one_drive_provider.py index 7ca8bdc..61040f1 100644 --- a/src/providers/one_drive_provider.py +++ b/src/providers/one_drive_provider.py @@ -31,8 +31,8 @@ class OneDriveProvider: client_id: Azure Portal のアプリ(公開クライアント)の Client ID authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common') scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"] - token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意) - access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意) + token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意) + access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意) """ self.client_id = client_id or os.getenv("MS_CLIENT_ID") or "" self.authority = authority or self.DEFAULT_AUTHORITY @@ -64,7 +64,7 @@ class OneDriveProvider: f.write(self._token_cache.serialize()) def ensure_token(self): - """有効な Access Token を確保(キャッシュ→デバイスコード)。""" + """有効な Access Token を確保(キャッシュ→デバイスコード)。""" if self._access_token: return self._access_token @@ -102,7 +102,7 @@ class OneDriveProvider: """ Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。 先頭に / を付けず、URL エンコードは requests 側に任せる前提で - 空白や日本語は安全のため quote することを推奨(今回は簡易化)。 + 空白や日本語は安全のため quote することを推奨(今回は簡易化)。 """ if not path or path.strip() in ["/", "."]: return "" @@ -156,7 +156,7 @@ class OneDriveProvider: def create_folder(self, folder_path: str) -> Dict[str, Any]: """ - 中間フォルダも順次作成(簡易実装)。 + 中間フォルダも順次作成(簡易実装)。 """ parts = [p for p in self._normalize_path(folder_path).split("/") if p] cur = "" @@ -300,7 +300,7 @@ class OneDriveProvider: return r.json() # ----------------------- - # 共有リンク(擬似 Signed URL) + # 共有リンク(擬似 Signed URL) # ----------------------- def generate_share_link( self, @@ -316,7 +316,7 @@ class OneDriveProvider: url = f"{self._item_by_path_url(path)}:/createLink" body: Dict[str, Any] = {"type": link_type, "scope": scope} if password: - body["password"] = password # パスワード保護(ポリシーにより可否) + body["password"] = password # パスワード保護(ポリシーにより可否) r: requests.Response = self._session.post(url, headers=self._headers(), json=body) r.raise_for_status() data:dict = r.json() diff --git a/src/providers/rss_reader_client.py b/src/providers/rss_reader_client.py new file mode 100644 index 0000000..c1f948e --- /dev/null +++ b/src/providers/rss_reader_client.py @@ -0,0 +1,41 @@ +import feedparser +from feedparser import FeedParserDict +from pydantic import BaseModel + +class Feed(BaseModel): + url: str + title: str | None = None + company: str | None = None + description: str | None = None + language: str | None = None + tags: list[str] | None = None + + +class RSSItem(BaseModel): + title: str + link: str + guid: str | None = None + published: str | None = None + ts: str | None = None + source: str | None = None + description: str | None = None + + +class RSSReaderClient: + """RSSリーダークライアント""" + @classmethod + def fetch(cls, feed: Feed, from_date: str, to_date: str) -> list[RSSItem]: + """指定されたフィードから記事を取得する""" + items = [] + d: FeedParserDict = feedparser.parse(feed.url) + for e in d.entries: + items.append(RSSItem( + title=e.get("title", "(no title)"), + link=e.get("link"), + guid=e.get("id") or e.get("guid") or e.get("link"), + published=e.get("published") or e.get("updated"), + ts=e.get("published") or e.get("updated"), + source=feed.url, + description=e.get("summary") or e.get("description"), + )) + return items \ No newline at end of file