最新コードを追加

This commit is contained in:
ry.yamafuji 2025-10-26 17:10:27 +09:00
parent 16e0d2d8a9
commit f74b1c7a10
23 changed files with 2034 additions and 21 deletions

View File

@ -3,4 +3,17 @@
```sh
python -m venv venv
.\venv\Scripts\activate
```
```
curl https://api.openai.com/v1/audio/transcriptions \
-H "Authorization: Bearer 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" \
-F "file=@part_001.m4a" \
-F "model=gpt-4o-transcribe" \
-F "response_format=text" \
-F "language=ja" \
-o part_001.txt
ffmpeg -i voice.m4a -f segment -segment_time 600 -c copy part_%03d.m4a
# フォーマットを揃える

12
docments/big_query.md Normal file
View File

@ -0,0 +1,12 @@
## Big Queryは「データウェアハウス型」
Big QueryはOLAP(分析)用途の列指向データベースなので、
RDB(MySQL,PostgreSQLなど)とは違い、行の一意性や整合性制約を持たない設計です。
## BigQueryに存在しない制約
* PRIMARY KEY
* FOREIGN KEY
* UNIQUE
* CHECK
* AUTO_INCREMENT

66
docments/windows_disk.md Normal file
View File

@ -0,0 +1,66 @@
```powershell
# 一時ファイルの削除
Remove-Item -Path "$env:TEMP\*" -Recurse -Force -ErrorAction SilentlyContinue
```
移動できるデータを他ドライブへ
```powershell
Get-ChildItem C:\ -Recurse -ErrorAction SilentlyContinue |
Where-Object { -not $_.PSIsContainer } |
Sort-Object Length -Descending |
Select-Object FullName, @{Name="Size(GB)";Expression={"{0:N2}" -f ($_.Length / 1GB)}} -First 20
```
サードパーティツールで整理
WinDirStat
→ ディスクの使用状況を可視化して、大きいファイルを一目で把握。
```
winget install WinDirStat
windirstat
```
CCleaner
→ 一時ファイルや不要レジストリの掃除に便利。
```
Remove-Item "C:\adobeTemp\*" -Recurse -Force -ErrorAction SilentlyContinue
ren "C:\adobeTemp" "adobeTemp_old"
```
attrib -h -s -r "C:\adobeTemp"
ren C:\adobeTemp C:\adobeTemp_old
ren "C:\adobeTemp" "adobeTemp_old"
mkdir "F:\adobeTemp"
mklink /J C:\adobeTemp F:\adobeTemp
ren C:\adobeTemp C:\adobeTemp_old
リンク先のフォルダを作成(Dドライブに新しい保存場所を用意
powershell
Copy code
mkdir D:\adobeTemp
ジャンクションを作成
管理者権限のコマンドプロンプトで:
cmd
Copy code
mklink /J C:\adobeTemp D:\adobeTemp
cd "C:\Users\r_yam\AppData\Local\Docker\wsl\data"
optimize-vhd -Path .\ext4.vhdx -Mode Full

110
example/example_bigquery.py Normal file
View File

@ -0,0 +1,110 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
def example_bigquery():
try:
# GoogleCloudBigQueryProviderのインスタンスを作成
provider = GoogleCloudBigQueryProvider(
cred_path="keys/google_service_accout.json",
)
dss = provider.get_datasets()
for ds in dss:
logger.info(f"Dataset ID: {ds.dataset_id}")
# データセットを作成する
dataset_id = "example_dataset"
if provider.is_exists_dataset(dataset_id):
logger.info(f"Dataset {dataset_id} already exists.")
else:
dataset = provider.create_dataset(dataset_id)
logger.info(f"Dataset {dataset_id} created at {dataset.created}.")
table_id = provider.full_table_id(dataset_id, "example_table")
# テーブルを作成する
if provider.is_exists_table(table_id):
logger.info(f"Table {table_id} already exists.")
else:
logger.info(f"Creating table {table_id}...")
schema = [
provider.addSchemaField("device_code", "string", "REQUIRED", description="Device code"),
provider.addSchemaField("time_stamp", "timestamp", "REQUIRED", description="Timestamp"),
]
table = provider.create_table(
table_id=table_id,
schema=schema
)
# tables = provider.get_tables(dataset_id)
# for table in tables:
# logger.info(f"Table ID: {table.table_id}")
# テーブル情報を挿入する
# provider.insert_rows(
# table_id=table_id,
# rows=[
# {"device_code": "device_001", "time_stamp": "2025-01-01 12:00:00"},
# {"device_code": "device_002", "time_stamp": "2025-01-01 12:05:00"},
# {"device_code": "device_003", "time_stamp": "2025-01-01 12:10:00"},
# ]
# )
# テーブル情報を確認する(JOBあり・SQL発行)
# job_qyuery = provider.excute_query(
# query=f"SELECT * FROM `{table_id}` where device_code='device_001'",
# )
# results = job_qyuery.result()
# for row in results:
# logger.info(f"Row: {row}")
# テーブル情報を確認する(JOBあり)
# rows = provider.list_rows(
# table_id=table_id,
# )
# for row in rows:
# logger.info(f"Row: {row}")
# バッファ状況の確認
# buffer_status = provider.get_streaming_buffer_info(table_id=table_id)
# logger.info(f"Streaming Buffer Info: {buffer_status}")
# テーブルを更新する
# query_job = provider.update_query(
# table_id=table_id,
# values={
# "device_code": "'device_999'"
# },
# were_clause="device_code='device_002'"
# )
# query_job.result() # ジョブの完了を待機
# # テーブル情報を確認する(JOBあり)
# job = provider.select_query(
# table_id=table_id,
# )
# results = job.result()
# for row in results:
# logger.info(f"Row: {row}")
# # テーブルのレコードを削除する
# provider.delete_query(
# table_id=table_id,
# were_clause="device_code='device_012'"
# )
# テーブルを削除する
# provider.delete_table(table_id=table_id)
except Exception as e:
logger.error(f"Error in example_bigquery: {e}")
example_bigquery()

View File

@ -0,0 +1,74 @@
import sys
import os
from datetime import datetime
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
from typing import ClassVar, Optional
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from dataclasses import dataclass
from models.bigquery_base_model import BigQueryBaseModel
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
@dataclass
class ExamplModel(BigQueryBaseModel):
device_code: str
time_stamp: datetime
table_id: ClassVar[str] = "example_dataset.example_table"
key_field: ClassVar[Optional[str]] = "device_code"
def example_model():
logger.info("Starting example_bigquery_model function.")
provider = GoogleCloudBigQueryProvider(
cred_path="keys/google_service_accout.json",
)
ExamplModel.set_provider(provider)
# テーブル内の全レコードを取得する
# records = ExamplModel.fetch_all()
# logger.info(f"Total records: {len(records)}")
# for record in records:
# logger.info(f"Record: {record}")
# レコードを生成する
# new_record = ExamplModel(
# device_code="device_010",
# time_stamp=datetime(2025, 1, 1, 15, 0, 0)
# )
# new_record.create()
# レコードを大量に生成する
# ExamplModel.insert(
# rows=[
# ExamplModel(device_code="device_011",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
# ExamplModel(device_code="device_012",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
# ])
# テーブルのストリームバッファ情報を取得する
if ExamplModel.is_streaming_buffer():
logger.info("Table is currently receiving streaming data.")
logger.info("not updated or delete in the streaming buffer.")
else:
logger.info("Table is not receiving streaming data.")
# 特定の条件でレコードを削除する
# ExamplModel.delete(where=[("device_code", "=", "device_010")])
# レコードを更新する
ExamplModel.update(
values={"device_code": "device_011_updated"},
where=[("device_code", "=", "device_011")]
)
# records = ExamplModel.list(where=[("device_code", "=", "device_001")])
# logger.info(f"Records with device_code='device_001': {records if records else 'No records found'}")
# record = ExamplModel.first()
# logger.info(f"First record: {record if record else 'No record found'}")
example_model()

View File

@ -0,0 +1,25 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from providers.duck_db_provider import DuckDBProvider
def example_duckdb():
logger.info("Starting example_duckdb function.")
file_path = "./data_science/data/y=*/news/news_*.csv"
provider = DuckDBProvider()
sql = f"""
SELECT *
FROM read_csv_auto('{file_path}' , HEADER=TRUE, IGNORE_ERRORS=TRUE)
"""
result = provider.query_df(sql)
print("latest published_parsed:", result)
# forで1件ずつ表示
for idx, row in result.iterrows():
logger.info(f"title:{row['title']}")
example_duckdb()

View File

@ -0,0 +1,87 @@
"""
HMACキーがひつようになる
* Google Cloud Consoleで発行する
* https://console.cloud.google.com/storage/settings
* 相互運用性(Interoperability)タブを開く
* HMACキーを作成ボタンを押す
* 使いたいサービスアカウントを選択
"""
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
from dotenv import load_dotenv
load_dotenv()
from lib.custom_logger import get_logger
logger = get_logger(level=10)
import duckdb
from providers.google_cloud_storage_provider import GoogleCloudStorageProvider
from providers.duck_db_provider import DuckDBProvider
def example_init_google_cloud_storage():
logger.info("Starting example_google_cloud_storage function.")
gcs_provider = GoogleCloudStorageProvider(
cred_path="./keys/google_service_accout.json",
)
csv_data = """id,name,age,city,score,created_at
1,Alice,25,Tokyo,88,2025-10-01T09:00:00Z
2,Bob,30,Osaka,75,2025-10-02T09:30:00Z
3,Charlie,28,Nagoya,92,2025-10-03T10:00:00Z
4,David,35,Fukuoka,64,2025-10-04T11:15:00Z
5,Eva,22,Sapporo,80,2025-10-05T12:45:00Z
"""
gcs_provider.write_item("datasource-example-251018",
"example/y=2025/m=10/example.csv",
csv_data.encode("utf-8"),"text/csv")
buckets = gcs_provider.get_buckets()
logger.info(f"Buckets: {buckets}")
def example_duckdb_cloud_raw():
logger.info("Starting example_duckdb_cloud_raw function.")
# DuckDB接続
con = duckdb.connect()
con.sql(f"""
CREATE OR REPLACE SECRET gcs_creds (
TYPE gcs,
KEY_ID {os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY')},
SECRET {os.getenv('GCP_STORAGE_HMAC_SECRET_KEY')}
);
""")
query = f"""
SELECT * FROM read_csv_auto('gs://datasource-example-251018/example/y=2025/m=10/example.csv');
"""
result = con.execute(query).df()
logger.info(f"Read {len(result)} rows from GCS file.")
def example_duckdb_cloud_class():
logger.info("Starting example_duckdb_cloud_class function.")
# DuckDB接続
provider = DuckDBProvider()
provider.setup_gcs(
access_key=os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY'),
secret_key=os.getenv('GCP_STORAGE_HMAC_SECRET_KEY'),
)
bucket_name = "datasource-example-251018"
object_name = "example/y=2025/m=*/example.csv"
query = f"""
SELECT * FROM {provider.get_gs_csv_name(bucket_name, object_name)};
"""
result = provider.query_df(query)
logger.info(f"Read {len(result)} rows from GCS file using DuckDBProvider.")
# example_init_google_cloud_storage()
# example_duckdb_cloud_raw()
example_duckdb_cloud_class()

View File

@ -0,0 +1,30 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
from typing import ClassVar, Optional
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from dataclasses import dataclass
from models.duck_base_model import DuckBaseModel
from providers.duck_db_provider import DuckDBProvider
@dataclass
class NewsModel(DuckBaseModel):
title: str
file_glob: ClassVar[str] = "./data_science/data/y=2025/news/news_*.csv"
default_order_by: ClassVar[Optional[str]] = "pub_date DESC"
def example_duckdb_model():
logger.info("Starting example_duckdb_model function.")
provider = DuckDBProvider()
provider.connect()
NewsModel.provider = provider
news = NewsModel.first()
logger.info(f"Total news: {news if news else 'No news found'}")
# example_duckdb_model()

47
example/example_rss.py Normal file
View File

@ -0,0 +1,47 @@
# openai_rss_reader_stdlib.py
# pip install feedparser
import feedparser
from email.utils import parsedate_to_datetime
FEEDS = [
"https://platform.openai.com/docs/release-notes.rss", # リリースノート
"https://openai.com/blog/rss.xml", # ブログ
]
def to_ts(entry: feedparser.FeedParserDict):
# published_parsed / updated_parsed があれば直接datetime化
if entry.get("published_parsed"):
return parsedate_to_datetime(entry.published)
if entry.get("updated_parsed"):
return parsedate_to_datetime(entry.updated)
return None
def uniq_key(entry: feedparser.FeedParserDict):
return entry.get("id") or entry.get("guid") or entry.get("link")
def fetch_all(feeds):
items = []
seen = set()
for url in feeds:
d: feedparser.FeedParserDict = feedparser.parse(url)
for e in d.entries:
k = uniq_key(e)
if not k or k in seen:
continue
seen.add(k)
ts = to_ts(e)
items.append({
"title": e.get("title", "(no title)"),
"link": e.get("link"),
"published": e.get("published") or e.get("updated"),
"ts": ts,
"source": url,
})
# ts が None のものは末尾に回す
items.sort(key=lambda x: (x["ts"] is not None, x["ts"]), reverse=True)
return items
if __name__ == "__main__":
for i, it in enumerate(fetch_all(FEEDS), 1):
print(f"{i:02d}. {it['title']}\n {it['link']}\n {it['published']} [{it['source']}]\n")

View File

@ -2,13 +2,26 @@ matplotlib
requests
pyttsx3
# pylib
pandas
# firebase_provider
firebase-admin>=7.1.0
# google cloud storage
google-cloud-storage
# google cloud big query
google-cloud-bigquery
# google cloud big query option
google-cloud-bigquery-storage
# bigquery to dataframe
db-dtypes
# onedrive
msal
# common
python-dotenv
# RSS
feedparser
# model
pydantic
# duckdb
duckdb==1.3.2

View File

@ -28,7 +28,6 @@ class CustomLogger(Singleton):
def __init__(self, name='main', log_file=None, level=logging.INFO):
if hasattr(self, '_initialized') and self._initialized:
self.logger.setLevel(level)
return # すでに初期化済みなら何もしない
self.logger = logging.getLogger(name)

View File

@ -2,7 +2,7 @@
This implementation is thread-safe and ensures that only one instance of the class is created.
Singleton が提供するのは同じインスタンスを返す仕組み
* __init__() は毎回呼ばれる多くの人が意図しない動作
* __init__() は毎回呼ばれる(多くの人が意図しない動作
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある
"""

View File

@ -14,7 +14,7 @@ class TaskScheduler:
# インスタンスを作成し、スレッドを起動
scheduler = TaskScheduler()
# インスタンスを破棄しかしスレッドは続行)
# インスタンスを破棄(しかしスレッドは続行)
# del scheduler
# メインスレッドは他の作業を続ける

View File

@ -0,0 +1,510 @@
import os
import base64
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
from enum import Enum
from decimal import Decimal
from datetime import datetime, date, time, timezone
from dataclasses import dataclass, fields, is_dataclass
import pandas as pd
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
from lib.custom_logger import get_logger
logger = get_logger()
T = TypeVar("T", bound="BigQueryBaseModel")
Filter = Tuple[str, str, Any]
def _build_where_clause_bq_without_prams(
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
)->str:
"""
BigQuery 向け WHERE 句の生成(パラメータなし)
Returns:
where_sql
"""
if where is None:
return ""
clauses = []
if isinstance(where, str):
# 文字列のまま(完全SQL指定)
return f"WHERE {where}"
if isinstance(where, dict):
for col, val in where.items():
if isinstance(val, (list, tuple, set)):
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
clauses.append(f"{col} IN ({placeholders})")
elif val is None:
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} = '{str(val).replace('\'', '\\\'')}'")
else:
# sequence of (col, op, val)
for col, op, val in where:
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
clauses.append(f"{col} IN ({placeholders})")
elif val is None and op in ("=", "=="):
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} {op} '{str(val).replace('\'', '\\\'')}'")
where_sql = "WHERE " + " AND ".join(clauses)
return where_sql
def _build_where_clause_bq(
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
) -> Tuple[str, List[Dict[str, Any]]]:
"""
BigQuery 向け WHERE 句とパラメータの生成
Returns:
(where_sql, query_params)
"""
if where is None:
return "", []
params = []
clauses = []
if isinstance(where, str):
# 文字列のまま(完全SQL指定)
return f"WHERE {where}", params
if isinstance(where, dict):
for i, (col, val) in enumerate(where.items()):
param_name = f"param{i}"
if isinstance(val, (list, tuple, set)):
placeholders = []
for j, v in enumerate(val):
pname = f"{param_name}_{j}"
placeholders.append(f"@{pname}")
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
clauses.append(f"{col} IN ({', '.join(placeholders)})")
elif val is None:
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} = @{param_name}")
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
else:
# sequence of (col, op, val)
for i, (col, op, val) in enumerate(where):
param_name = f"param{i}"
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
placeholders = []
for j, v in enumerate(val):
pname = f"{param_name}_{j}"
placeholders.append(f"@{pname}")
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
clauses.append(f"{col} IN ({', '.join(placeholders)})")
elif val is None and op in ("=", "=="):
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} {op} @{param_name}")
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
where_sql = "WHERE " + " AND ".join(clauses)
return where_sql, params
def to_bq_json(v):
if isinstance(v, datetime):
# TIMESTAMP 用(UTCに正規化して Z を付ける)
if v.tzinfo is None:
v = v.replace(tzinfo=timezone.utc)
return v.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(v, date) and not isinstance(v, datetime):
# DATE 用
return v.isoformat()
if isinstance(v, time):
# TIME 用
# v.isoformat() は microseconds を含むことあり → そのままでOK
return v.isoformat()
if isinstance(v, Decimal):
# NUMERIC/BIGNUMERIC に文字列で渡すのが安全
return str(v)
if isinstance(v, bytes):
# BYTES は base64 文字列
return base64.b64encode(v).decode("ascii")
if isinstance(v, Enum):
return v.value
if isinstance(v, list):
return [to_bq_json(x) for x in v]
if isinstance(v, dict):
return {k: to_bq_json(val) for k, val in v.items()}
return v
@dataclass
class BigQueryBaseModel:
table_id: ClassVar[str] # 例: "datasetid.table_name"
provider: ClassVar[GoogleCloudBigQueryProvider | None] = None
key_field: ClassVar[Optional[str]] = None # 例: "id" 主キーに相当するフィールド名
columns_select: ClassVar[str] = "*" # 例: "col1, col2, col3"
default_order_by: ClassVar[Optional[str]] = None
@classmethod
def set_provider(cls, provider: GoogleCloudBigQueryProvider):
cls.provider = provider
@classmethod
def fetch_all(cls: Type[T]) -> List[T]:
"""テーブル内の全レコードを取得する"""
if cls.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
rows = cls.provider.list_rows(table_id=cls.table_id)
result: List[T] = []
for row in rows:
data = dict(row)
obj: T = cls(**data) # type: ignore[arg-type]
result.append(obj)
return result
def create(self):
"""レコードをテーブルに挿入する"""
if self.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
data = {f.name: getattr(self, f.name) for f in fields(self)}
data = to_bq_json(data)
logger.debug(f"Inserting data: {data}")
self.provider.insert_rows(
table_id=self.table_id,
rows=[data]
)
@classmethod
def insert(cls, rows: List[T]):
"""複数レコードをテーブルに挿入する"""
if cls.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
bq_rows = []
for row in rows:
data = {f.name: getattr(row, f.name) for f in fields(row)}
bq_rows.append(to_bq_json(data))
logger.info(f"Inserting data: {len(bq_rows)}")
logger.debug(f"Inserting data details: {bq_rows[:10]}...")
cls.provider.insert_rows(
table_id=cls.table_id,
rows=bq_rows
)
@classmethod
def is_streaming_buffer(cls) -> bool:
"""テーブルのストリームバッファ情報を取得する"""
if cls.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
buf_info = cls.provider.get_streaming_buffer_info(table_id=cls.table_id)
logger.debug(f"Streaming buffer info: {buf_info}")
return buf_info is not None
def save(self):
"""レコードをテーブルに更新する"""
if self.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
if self.is_streaming_buffer():
raise RuntimeError("Cannot update records in streaming buffer.")
if self.key_field is None:
raise RuntimeError("key_field is not set.")
key_value = getattr(self, self.key_field)
if key_value is None:
raise ValueError(f"Key field '{self.key_field}' value is None.")
data = {f.name: getattr(self, f.name) for f in fields(self)}
data = to_bq_json(data)
set_clauses = []
for k, v in data.items():
if k == self.key_field:
continue
if isinstance(v, str):
v_str = f"'{v.replace('\'', '\\\'')}'"
else:
v_str = str(v)
set_clauses.append(f"{k}={v_str}")
set_clause = ", ".join(set_clauses)
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
logger.debug(f"Updating data: SET {set_clause} WHERE {where_clause}")
self.provider.update_query(
table_id=self.table_id,
values=set_clause,
were_clause=where_clause
)
def destroy(self):
"""レコードをテーブルから削除する"""
if self.provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
if self.is_streaming_buffer():
raise RuntimeError("Cannot delete records in streaming buffer.")
if self.key_field is None:
raise RuntimeError("key_field is not set.")
key_value = getattr(self, self.key_field)
if key_value is None:
raise ValueError(f"Key field '{self.key_field}' value is None.")
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
logger.debug(f"Deleting data: WHERE {where_clause}")
self.provider.delete_query(
table_id=self.table_id,
were_clause=where_clause
)
# -------------------------------------
# select
@classmethod
def select(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
):
params_dict = []
where_sql, params_dict = _build_where_clause_bq(where)
# where_sql = _build_where_clause_bq_without_prams(where)
order = order_by or cls.default_order_by
order_sql = f"ORDER BY {order}" if order else ""
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
sql = f"""
SELECT {cls.columns_select}
FROM `{cls.table_id}`
{where_sql}
{order_sql}
{limit_sql}
{offset_sql}
"""
# logger.debug(f"Select Query: {sql} with params_dict: {params_dict}")
if provider is None:
provider = cls.provider
if provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
params = []
for p in params_dict:
param = provider.addQueryParameter(
name=p["name"],
value=p["parameterValue"]["value"],
param_type=p["parameterType"]["type"]
)
params.append(param)
# パラメータバインドGoogleCloudBigQueryProvider の生 client を使用
# query_job = provider.excute_query(sql, params=params)
query_job = provider.excute_query(sql, params=params)
return query_job
@classmethod
def select_df(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> pd.DataFrame:
"""DataFrame で取得
Notes:
Google Cloud BigQuery から直接 DataFrame を取得するには db-dtypes パッケージが必要
"""
query_job = cls.select(
provider=provider,
where=where,
order_by=order_by,
limit=limit,
offset=offset,
)
return query_job.to_dataframe()
@classmethod
def list(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[T]:
job = cls.select(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
results = job.result()
instances: List[T] = []
for row in results:
data = dict(row)
instance = cls(**data) # type: ignore
instances.append(instance)
# df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
# if not is_dataclass(cls):
# raise TypeError(f"{cls.__name__} is not a dataclass.")
# instances: List[T] = []
# for _, row in df.iterrows():
# data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
# instance = cls(**data) # type: ignore
# instances.append(instance)
return instances
@classmethod
def first(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
) -> Optional[T]:
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
return results[0] if results else None
@classmethod
def count(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
) -> int:
where_sql, params_dict = _build_where_clause_bq(where)
sql = f"""
SELECT COUNT(*) as cnt
FROM `{cls.table_id}`
{where_sql}
"""
# パラメータバインドGoogleCloudBigQueryProvider の生 client を使用
if provider is None:
provider = cls.provider
if provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
params = []
for p in params_dict:
param = provider.addQueryParameter(
name=p["name"],
value=p["parameterValue"]["value"],
param_type=p["parameterType"]["type"]
)
params.append(param)
provider.excute_query(
sql,
params=params
)
query_job = provider.excute_query(sql, params=params)
results = query_job.result()
for row in results:
return row["cnt"]
return 0
@classmethod
def exists(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
) -> bool:
if provider is None:
provider = cls.provider
return cls.count(provider, where=where) > 0
@classmethod
def delete(
cls: Type[T],
provider: GoogleCloudBigQueryProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
):
where_sql, params_dict = _build_where_clause_bq(where)
sql = f"""
DELETE FROM `{cls.table_id}`
{where_sql}
"""
if provider is None:
provider = cls.provider
if provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
params = []
for p in params_dict:
param = provider.addQueryParameter(
name=p["name"],
value=p["parameterValue"]["value"],
param_type=p["parameterType"]["type"]
)
params.append(param)
job = provider.excute_query(
sql,
params=params
)
job.result() # ジョブの完了を待機
@classmethod
def update(
cls: Type[T],
values: dict[str, Any],
*,
where: Union[str, Dict[str, Any], Sequence[Filter]] ,
provider: GoogleCloudBigQueryProvider = None,
):
params_dicts = []
where_sql, params_dicts = _build_where_clause_bq(where)
set_clauses = []
for k, v in values.items():
param_name = f"set_{k}"
set_clauses.append(f"{k}=@{param_name}")
params_dicts.append({
"name": param_name,
"parameterType": {"type": "STRING"},
"parameterValue": {"value": str(v)}
})
sql = f"""
UPDATE `{cls.table_id}`
SET {', '.join(set_clauses)}
{where_sql}
"""
logger.debug(f"update Query: {sql} with params_dict: {params_dicts}")
if provider is None:
provider = cls.provider
if provider is None:
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
params = []
for p in params_dicts:
param = provider.addQueryParameter(
name=p["name"],
value=p["parameterValue"]["value"],
param_type=p["parameterType"]["type"]
)
params.append(param)
job = provider.excute_query(
sql,
params=params
)
job.result() # ジョブの完了を待機

View File

@ -0,0 +1,184 @@
import os
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
from dataclasses import dataclass, fields, is_dataclass
import pandas as pd
from providers.duck_db_provider import DuckDBProvider
DATA_BASEPATH=os.getenv("DATA_BASEPATH","./data")
from lib.custom_logger import get_logger
logger = get_logger()
T = TypeVar("T", bound="DuckBaseModel")
Filter = Tuple[str, str, Any]
def _build_where_clause(where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]]
) -> Tuple[str, List[Any]]:
if where is None:
return "", []
params: List[Any] = []
if isinstance(where, str):
return f"WHERE {where}", params
clauses: List[str] = []
if isinstance(where, dict):
for col, val in where.items():
if isinstance(val, (list, tuple, set)):
placeholders = ", ".join(["?"] * len(val))
clauses.append(f"{col} IN ({placeholders})")
params.extend(list(val))
elif val is None:
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} = ?")
params.append(val)
else:
# sequence of (col, op, value)
for col, op, val in where:
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
placeholders = ", ".join(["?"] * len(val))
clauses.append(f"{col} IN ({placeholders})")
params.extend(list(val))
elif val is None and op in ("=", "=="):
clauses.append(f"{col} IS NULL")
else:
clauses.append(f"{col} {op} ?")
params.append(val)
if not clauses:
return "", []
return "WHERE " + " AND ".join(clauses), params
@dataclass
class DuckBaseModel:
file_glob: ClassVar[str] # 例: "./data_science/data/y=2025/news/news_*.csv"
provider: ClassVar[DuckDBProvider | None] = None
columns_select: ClassVar[str] = "*" # 例: "published_parsed, title, link"
default_order_by: ClassVar[Optional[str]] = None
# read_csv_auto オプション(必要ならサブクラスで上書き)
hive_partitioning: ClassVar[bool] = True
union_by_name: ClassVar[bool] = True
header: ClassVar[bool] = True
ignore_errors: ClassVar[bool] = True
@classmethod
def set_provider(cls, provider: DuckDBProvider):
cls.provider = provider
# ---- FROM 句(CSV読込 ----
@classmethod
def _from_clause(cls,file_glob=None) -> str:
path = file_glob or cls.file_glob
path = path.replace("'", "''")
path =f"{DATA_BASEPATH}{path}"
# path = cls.file_glob.replace("'", "''")
hp = 1 if cls.hive_partitioning else 0
un = 1 if cls.union_by_name else 0
hd = "TRUE" if cls.header else "FALSE"
ig = "TRUE" if cls.ignore_errors else "FALSE"
return (f"read_csv_auto('{path}', HEADER={hd}, IGNORE_ERRORS={ig}, "
f"hive_partitioning={hp}, union_by_name={un})")
@classmethod
def select_df(
cls: Type[T],
provider: DuckDBProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
file_glob=None,
) -> List[T]:
where_sql, params = _build_where_clause(where)
order = order_by or cls.default_order_by
order_sql = f"ORDER BY {order}" if order else ""
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
sql = f"""
SELECT {cls.columns_select}
FROM {cls._from_clause(file_glob=file_glob)}
{where_sql}
{order_sql}
{limit_sql}
{offset_sql}
"""
# パラメータバインドDuckDBProvider の生 connection を使用
if provider is None:
provider = cls.provider
cur = provider.con.execute(sql, params)
return cur.df()
@classmethod
def list(
cls: Type[T],
provider: DuckDBProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
file_glob=None,
) -> List[T]:
df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset,file_glob=file_glob)
if not is_dataclass(cls):
raise TypeError(f"{cls.__name__} is not a dataclass.")
instances: List[T] = []
for _, row in df.iterrows():
data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
instance = cls(**data) # type: ignore
instances.append(instance)
return instances
@classmethod
def first(
cls: Type[T],
provider: DuckDBProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
order_by: Optional[str] = None,
) -> Optional[T]:
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
return results[0] if results else None
@classmethod
def count(
cls: Type[T],
provider: DuckDBProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
) -> int:
where_sql, params = _build_where_clause(where)
sql = f"""
SELECT COUNT(*) AS cnt
FROM {cls._from_clause()}
{where_sql}
"""
# パラメータバインドDuckDBProvider の生 connection を使用
if provider is None:
provider = cls.provider
cur = provider.con.execute(sql, params)
row = cur.fetchone()
return row["cnt"] if row else 0
@classmethod
def exists(
cls: Type[T],
provider: DuckDBProvider = None,
*,
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
) -> bool:
if provider is None:
provider = cls.provider
return cls.count(provider, where=where) > 0

View File

@ -45,7 +45,7 @@ class FirestoreBaseModel:
return cls._col().document(doc_id)
def _as_mutable_dict(self) -> Dict[str, Any]:
"""dataclass → dictid と None は必要に応じて処理)"""
"""dataclass → dict(id と None は必要に応じて処理)"""
data = asdict(self)
data.pop("_doc_id", None)
data.pop("collection_name", None)
@ -54,7 +54,7 @@ class FirestoreBaseModel:
@classmethod
def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
"""dict -> モデル(data内の'id'を拾う"""
"""dict -> モデル(data内の'id'を拾う)"""
data = dict(data)
_id = data.pop("id", None)
obj: T = cls(**data) # type: ignore[arg-type]
@ -88,7 +88,7 @@ class FirestoreBaseModel:
return self._doc_id
def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None:
"""部分更新set(merge=True)の糖衣)"""
"""部分更新(set(merge=True)の糖衣)"""
if not self._doc_id:
raise ValueError("Cannot update without id.")
payload = dict(changes)
@ -205,7 +205,7 @@ class FirestoreBaseModel:
) -> int:
"""
条件でヒットしたドキュメントを一括削除削除件数を返す
list_documents に依存複合インデックスなしポリシーはそちらの制約に従う
list_documents に依存(複合インデックスなしポリシーはそちらの制約に従う)
"""
from providers.firestore_provider import FireStoreProvider
rows = FireStoreProvider.list_documents(

View File

@ -0,0 +1,73 @@
import duckdb
class DuckDBProvider:
def __init__(self, db_path: str = ":memory:", read_only: bool = False):
self.con = self.connect(db_path, read_only)
def connect(self, db_path: str = ":memory:", read_only: bool = False):
return duckdb.connect(database=db_path, read_only=read_only)
def setup_gcs(self, access_key: str,secret_key: str):
"""GCSのシークレットを設定する"""
if not self.con:
raise ValueError("DuckDB is not connected.")
self.con.sql(f"""
CREATE OR REPLACE SECRET gcs_creds (
TYPE gcs,
KEY_ID '{access_key}',
SECRET '{secret_key}'
);
""")
def close(self):
"""接続を閉じる"""
if self.con:
self.con.close()
def query_df(self, sql: str):
"""SQLクエリを実行してDataFrameで返す"""
return self.con.execute(sql).df()
def max_value(
self,
file_path: str,
column: str,
hive_partitioning: bool = True,
union_by_name: bool = True,
) -> any:
"""CSVファイルの指定列の最大値を取得する"""
query = f"""
SELECT MAX({column}) AS max_{column}
FROM read_csv_auto('{file_path}',
hive_partitioning={1 if hive_partitioning else 0},
union_by_name={1 if union_by_name else 0}
)
"""
result = self.con.execute(query).fetchone()[0]
return result
def load_file(self, file_glob: str, table: str):
"""CSVを読み込みテーブル化"""
sql = f"""
CREATE OR REPLACE TABLE {table} AS
SELECT *
FROM read_csv_auto('{file_glob}', HEADER=TRUE, IGNORE_ERRORS=TRUE)
"""
self.con.execute(sql)
@staticmethod
def get_gs_csv_name(
backet: str,
object_name: str,
hive_partitioning: bool = True,
union_by_name: bool = True,
) -> str:
return f"""read_csv_auto('gs://{backet}/{object_name}',
hive_partitioning={1 if hive_partitioning else 0},
union_by_name={1 if union_by_name else 0}
)
"""

View File

@ -78,12 +78,12 @@ class FireStoreProvider():
start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""
コレクションのドキュメントを取得フィルタ/並び替え/制限/ページング対応
コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応
Args:
collection_name: コレクション名
filters: [(field, op, value), ...]
order_by: 並び順のフィールド名 or その配列'-created_at' のように先頭'-'で降順
order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順
limit: 取得件数の上限
start_after: ドキュメントスナップショットまたは order_by で並べた最後の値の辞書
@ -189,13 +189,13 @@ class FireStoreProvider():
merge: bool = True,
) -> None:
"""
ドキュメントを更新merge=True なら部分更新推奨
ドキュメントを更新merge=True なら部分更新(推奨
"""
ref = cls.get_doc_ref(collection_name, doc_id)
if merge:
ref.set(data, merge=True)
else:
# 全置換存在しないと作成される)
# 全置換(存在しないと作成される)
ref.set(data, merge=False)
logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})")

View File

@ -0,0 +1,662 @@
import os
import time
from datetime import datetime
from typing import Optional, Dict, Union,Sequence
from enum import Enum
from google.cloud import bigquery
from google.cloud.bigquery.table import RowIterator, TableListItem
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.table import StreamingBuffer
from google.oauth2 import service_account
from lib.custom_logger import get_logger
logger = get_logger()
class BigQueryFieldType(str,Enum):
STRING = "STRING"
BYTES = "BYTES"
INTEGER = "INTEGER"
FLOAT = "FLOAT"
BOOLEAN = "BOOLEAN"
TIMESTAMP = "TIMESTAMP"
DATE = "DATE"
TIME = "TIME"
DATETIME = "DATETIME"
RECORD = "RECORD"
NUMERIC = "NUMERIC"
BIGNUMERIC = "BIGNUMERIC"
GEOGRAPHY = "GEOGRAPHY"
JSON = "JSON"
class BogQueryFieldMode(str,Enum):
NULLABLE = "NULLABLE" # NULL許可
REQUIRED = "REQUIRED" # 必須
REPEATED = "REPEATED" # 配列
class GoogleCloudBigQueryProvider:
"""Cloud BigQuery操作プロバイダ
Notes:
- このクラスはGoogle Cloud BigQueryの基本的な操作を提供します
- 認証にはサービスアカウントキーまたはApplication Default Credentials(ADC)を使用します
- BigQuery Storage Read APIを使用するなら別のライブラリを使用してください(高速読み込み)
- 必要な権限
- データセット操作を行う場合はBigQuery データオーナーの権限が必要です
- bigquery.dataViewer / bigquery.dataEditor / bigquery.readSessionUserなど部分でも可能
- JOB操作を行う場合はBigQuery ジョブユーザーの権限が必要です
- フル機能を使う場合はBigQuery データオーナー及びBigQuery ジョブユーザーの権限が必要です
"""
def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None):
try:
if cred_path:
creds = service_account.Credentials.from_service_account_file(cred_path)
# プロジェクト未指定なら credentials から取得
effective_project = project or creds.project_id
self._client = bigquery.Client(
project=effective_project, credentials=creds
)
logger.info(f"Bigquery client initialized with service account file. project={effective_project}")
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"):
cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
creds = service_account.Credentials.from_service_account_info(cred_json)
effective_project = project or creds.project_id
self._client = bigquery.Client(
project=effective_project, credentials=creds
)
logger.info("Bigquery client initialized with credentials from environment variable.")
else:
self._client = bigquery.Client(project=project)
logger.info("Bigquery client initialized with default credentials (ADC).")
except Exception as e:
logger.error(f"Bigquery initialization failed: {e}")
raise
def get_datasets(self)-> list[DatasetListItem]:
"""データセット一覧を取得する"""
try:
datasets = list(self._client.list_datasets())
return datasets
except Exception as e:
logger.error(f"Failed to list datasets: {e}")
raise
def create_dataset(
self, dataset_id: str,
location: Optional[str] ="asia-northeast1") -> bigquery.Dataset:
"""
データセットを作成する
Args:
dataset_id (str): 作成するデータセットID
location (Optional[str]): データセットのロケーション
Returns:
bigquery.Dataset: 作成されたデータセットオブジェクト
Notes:
- 既に存在するデータセットIDを指定した場合例外が発生します
- 必要な権限
- bigquery.datasets.create
- 命名規則
- 英小文字 (a-z)数字 (0-9)およびアンダースコア (_) のみ使用可能
"""
try:
dataset_ref = self._client.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)
if location:
dataset.location = location
dataset = self._client.create_dataset(dataset)
logger.info(f"Dataset {dataset_id} created.")
return dataset
except Exception as e:
logger.error(f"Failed to create dataset {dataset_id}: {e}")
raise
def is_exists_dataset(self, dataset_id: str) -> bool:
"""データセットが存在するか確認する"""
try:
self._client.get_dataset(dataset_id)
return True
except Exception:
return False
def delete_dataset(self, dataset_id: str, delete_contents: bool = False):
"""
データセットを削除する
Args:
dataset_id (str): 削除するデータセットID
delete_contents (bool): データセット内のテーブルも削除する場合はTrue
Notes:
- 必要な権限
- bigquery.datasets.delete
- delete_contents=Falseの場合データセット内にテーブルが存在すると削除に失敗します
- その場合はdelete_contents=Trueを指定してください
- `google.api_core.exceptions.Conflict`が発生します
"""
try:
self._client.delete_dataset(dataset_id, delete_contents=delete_contents)
logger.info(f"Dataset {dataset_id} deleted.")
except Exception as e:
logger.error(f"Failed to delete dataset {dataset_id}: {e}")
raise
def full_table_id(self, dataset_id: str, table_name: str) -> str:
"""テーブルIDを構築する"""
project = self._client.project
return f"{project}.{dataset_id}.{table_name}"
def is_exists_table(self,table_id: str) -> bool:
"""テーブルが存在するか確認する"""
try:
self._client.get_table(table_id)
return True
except Exception:
return False
@staticmethod
def addSchemaField(
name:str,
field_type:Union[str, BigQueryFieldType],
field_mode:Union[str, BogQueryFieldMode]=BogQueryFieldMode.NULLABLE ,
default_value_expression:Optional[str]=None,
description:Optional[str]=None
)-> bigquery.SchemaField:
if isinstance(field_type, BigQueryFieldType):
field_type = field_type.value
elif isinstance(field_type, str):
field_type = field_type.upper()
if isinstance(field_mode, BogQueryFieldMode):
field_mode = field_mode.value
elif isinstance(field_mode, str):
field_mode = field_mode.upper()
return bigquery.SchemaField(
name=name,
field_type=field_type,
mode=field_mode ,
default_value_expression=default_value_expression,
description=description)
def create_table(
self,
table_id: str,
schema: list[bigquery.SchemaField]) -> bigquery.Table:
"""
テーブルを作成する
Args:
table_id (str): 作成するテーブルID(: 'dataset.table')
schema (list[bigquery.SchemaField]): テーブルのスキーマ定義
Returns:
bigquery.Table: 作成されたテーブルオブジェクト
Notes:
- 既に存在するテーブルIDを指定した場合例外が発生します
"""
try:
table = bigquery.Table(table_id, schema=schema)
table = self._client.create_table(table)
logger.info(f"Table {table_id} created.")
return table
except Exception as e:
logger.error(f"Failed to create table {table_id}: {e}")
raise
def delete_table(self, table_id: str, not_found_ok: bool = False):
"""
テーブルを削除する
Args:
table_id (str): 削除するテーブルID(: 'dataset.table')
not_found_ok (bool): テーブルが存在しない場合でもエラーにしない場合はTrue
"""
try:
self._client.delete_table(table_id, not_found_ok=not_found_ok)
logger.info(f"Table {table_id} deleted.")
except Exception as e:
logger.error(f"Failed to delete table {table_id}: {e}")
raise
def get_tables(self, dataset_id: str) -> list[TableListItem]:
"""
データセット内のテーブル一覧を取得する
Args:
dataset_id (str): 対象のデータセットID
Returns:
list[bigquery.TableListItem]: テーブル一覧
"""
try:
tables = list(self._client.list_tables(dataset_id))
logger.info(f"Tables listed successfully from dataset {dataset_id}, count {len(tables)}.")
return tables
except Exception as e:
logger.error(f"Failed to list tables in dataset {dataset_id}: {e}")
raise
def insert_rows(
self,
table_id: str,
rows: list[Dict[str, any]]) -> Sequence[dict]:
"""
テーブルに行を挿入する
Args:
table_id (str): 挿入先のテーブルID(: 'dataset.table')
rows (list[Dict[str, any]]): 挿入する行データのリスト
Returns:
Sequence[dict]: 挿入結果のレスポンスオブジェクト
Notes:
- rowsは辞書形式で指定しますキーがカラム名値がカラムの値となります
- : [{"column1": "value1", "column2": 123}, {"column1": "value2", "column2": 456}]
"""
try:
errors = self._client.insert_rows_json(table_id, rows)
if errors:
logger.error(f"Errors occurred while inserting rows into {table_id}: {errors}")
else:
logger.info(f"Rows inserted successfully into {table_id}.")
return errors
except Exception as e:
logger.error(f"Failed to insert rows into table {table_id}: {e}")
raise
def list_rows(
self,
table_id: str,
selected_fields:Optional[list[str]] = None,
max_results:Optional[int] = None,
start_index:Optional[int] = None
) -> RowIterator:
"""
テーブルの行を一覧取得する
Args:
table_id (str): 対象のテーブルID(: 'dataset.table')
selected_fields (Optional[list[str]]): 取得するカラム名のリストNoneの場合は全カラム取得
max_results (Optional[int]): 取得する行数の上限Noneの場合は制限なし
start_index (Optional[int]): 取得開始インデックスNoneの場合は先頭から取得
Returns:
RowIterator: 取得した行のイテレータ
"""
try:
table = self._client.get_table(table_id)
_selected_fields:Optional[list[bigquery.SchemaField]] = None
if selected_fields:
_selected_fields = [field for field in table.schema if field.name in selected_fields]
rows = self._client.list_rows(
table,
selected_fields=_selected_fields,
max_results=max_results,
start_index=start_index
)
logger.info(f"Rows listed successfully from table {table_id}.")
return rows
except Exception as e:
logger.error(f"Failed to list rows from table {table_id}: {e}")
raise
def list_rows_to_dataframe(
self,
table_id: str,
selected_fields:Optional[list[str]] = None,
max_results:Optional[int] = None,
start_index:Optional[int] = None
):
try:
table = self._client.get_table(table_id)
df = self._client.list_rows(
table,
selected_fields=selected_fields,
max_results=max_results,
start_index=start_index
).to_dataframe()
logger.info(f"Rows listed to DataFrame successfully from table {table_id}.")
return df
except Exception as e:
logger.error(f"Failed to list rows to DataFrame from table {table_id}: {e}")
raise
# --------------------------------------------------------
def get_streaming_buffer_info(self,table_id: str):
"""スリーミングバッファを取得する
Notes:
- スリーミングバッファはストリーミング挿入によってテーブルに追加されたデータが永続的なストレージに書き込まれる前に一時的に保存される場所です
- スリーミングバッファの情報が存在しない場合Noneを返します
- ストリーミングバッファ内のデータを直接削除更新クリアすることは不可能です
- GCP側で完全に自動制御されており手動でクリア/フラッシュ/削除する手段は 存在しません
- 通常:数秒数分 高頻度ストリーミング:30 極端な負荷多パーティション:最大約90分(GCP上限目安)
"""
t = self._client.get_table(table_id)
# SDK属性 or 下位プロパティのどちらかに入っています
buf:StreamingBuffer = getattr(t, "streaming_buffer", None) or t._properties.get("streamingBuffer")
if not buf:
return None
# 例: {'estimatedRows': '123', 'estimatedBytes': '456789', 'oldestEntryTime': '1698234000000'}
logger.debug(f"Streaming Buffer Raw Info: {buf}")
return {
"estimated_rows": buf.estimated_rows,
"estimated_bytes": buf.estimated_bytes,
"oldest_entry_ms": buf.oldest_entry_time,
"raw": buf,
}
def excute_query(
self,
query: str,
params:list = None) -> bigquery.QueryJob:
"""
クエリを実行する
Args:
query (str): 実行するSQLクエリ
Returns:
bigquery.QueryJob: 実行結果のQueryJobオブジェクト
"""
try:
if params and len(params) >0:
query_job = self._client.query(query, job_config=bigquery.QueryJobConfig(
query_parameters=params
))
else:
query_job = self._client.query(query)
logger.info("Query executed successfully.")
logger.debug(f"Query JOB: {query_job}")
return query_job
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
@staticmethod
def addQueryParameter(
name:str,
value: any,
param_type:Union[str, BigQueryFieldType]=BigQueryFieldType.STRING
) -> bigquery.ScalarQueryParameter:
"""クエリパラメータを作成する"""
if isinstance(param_type, BigQueryFieldType):
param_type = param_type.value
elif isinstance(param_type, str):
param_type = param_type.upper()
return bigquery.ScalarQueryParameter(
name=name,
type_=param_type,
value=value
)
def select_query(
self,
table_id: str,
columns: list[str] = None,
were_clause: str = None,
params: list = None,
order_by: str = None,
limit: int = None
):
"""
指定したテーブルからデータを選択するクエリを実行する
Args:
table_id (str): 対象のテーブルID(: 'dataset.table')
columns (list[str]): 取得するカラム名のリストNoneの場合は全カラム取得
were_clause (str): WHERE句の条件式Noneの場合は条件なし
order_by (str): 並び替えのカラム名Noneの場合は並び替えなし
limit (int): 取得する行数の上限Noneの場合は制限なし
"""
try:
cols = ", ".join(columns) if columns else "*"
query = f"SELECT {cols} FROM `{table_id}`"
if were_clause:
query += f" WHERE {were_clause}"
if order_by:
query += f" ORDER BY {order_by}"
if limit:
query += f" LIMIT {limit}"
query_job = self.excute_query(query, params=params)
logger.info("Select query executed successfully.")
return query_job
except Exception as e:
logger.error(f"Failed to execute select query on table {table_id}: {e}")
raise
def select_query_to_dataframe(
self,
table_id: str,
columns: list[str] = None,
were_clause: str = None,
params: list = None,
order_by: str = None,
limit: int = None
):
try:
query_job= self.select_query(
table_id=table_id,
columns=columns,
were_clause=were_clause,
params=params,
order_by=order_by,
limit=limit
)
df = query_job.to_dataframe()
logger.info("Select query to DataFrame executed successfully.")
return df
except Exception as e:
logger.error(f"Failed to convert query result to DataFrame for table {table_id}: {e}")
raise
def select_query_to_dict(
self,
table_id: str,
columns: list[str] = None,
were_clause: str = None,
params: list = None,
order_by: str = None,
limit: int = None
) -> list[Dict[str, any]]:
try:
query_job= self.select_query(
table_id=table_id,
columns=columns,
were_clause=were_clause,
params=params,
order_by=order_by,
limit=limit
)
results = query_job.result()
dicts = [dict(row) for row in results]
logger.info("Select query to dicts executed successfully.")
return dicts
except Exception as e:
logger.error(f"Failed to convert query result to dicts for table {table_id}: {e}")
raise
def isert_query(
self,
table_id: str,
values: dict[str, str],
params: list = None
) -> bigquery.QueryJob:
"""
挿入クエリ(INSERT)を実行する
"""
try:
query = f"INSERT INTO `{table_id}` ({', '.join(values.keys())})"
query += f" VALUES ({', '.join([str(v) for v in values.values()])})"
query_job = self.excute_query(query, params=params)
logger.info("Insert query executed successfully.")
return query_job
except Exception as e:
logger.error(f"Failed to execute insert query on table {table_id}: {e}")
raise
def update_query(
self,
table_id: str,
values: dict[str, str],
were_clause: str,
params: list = None
) -> bigquery.QueryJob:
"""
更新クエリ(UPDATE)を実行する
Args:
table_id (str): 対象のテーブルID(: 'dataset.table')
values (dict[str, str]): 更新するカラムと値の辞書
were_clause (str): WHERE句の条件式全件の場合は"TRUE"を指定
"""
try:
query = f"UPDATE `{table_id}`"
query += f" SET {', '.join([f'{k} = {v}' for k, v in values.items()])}"
if were_clause:
query += f" WHERE {were_clause}"
logger.debug(f"Update Query: {query}")
query_job = self.excute_query(query, params=params)
logger.info("Update query executed successfully.")
return query_job.result()
except Exception as e:
logger.error(f"Failed to execute update query on table {table_id}: {e}")
raise
def delete_query(
self,
table_id: str,
were_clause:str,
params: list = None
) -> bigquery.QueryJob:
"""
削除クエリ(DELETE)を実行する
"""
try:
query = f"DELETE FROM `{table_id}`"
if were_clause:
query += f" WHERE {were_clause}"
query_job = self.excute_query(query, params=params)
logger.info("Delete query executed successfully.")
return query_job.result()
except Exception as e:
logger.error(f"Failed to execute delete query on table {table_id}: {e}")
raise
def upsert_query(
self,
table_id: str,
insert_values: dict[str, str],
update_values: dict[str, str],
key_columns: list[str],
params: list = None
) -> bigquery.QueryJob:
"""
UPSERTクエリ(MERGE)を実行する
"""
try:
query = f"MERGE INTO `{table_id}` T"
query += f" USING (SELECT {', '.join([str(v) for v in insert_values.values()])}) S"
query += f" ON {' AND '.join([f'T.{col} = S.{col}' for col in key_columns])}"
query += " WHEN MATCHED THEN"
query += f" UPDATE SET {', '.join([f'T.{k} = {v}' for k, v in update_values.items()])}"
query += " WHEN NOT MATCHED THEN"
query += f" INSERT ({', '.join(insert_values.keys())})"
query += f" VALUES ({', '.join([str(v) for v in insert_values.values()])})"
query_job = self.excute_query(query, params=params)
logger.info("Upsert query executed successfully.")
return query_job
except Exception as e:
logger.error(f"Failed to execute upsert query on table {table_id}: {e}")
raise
def import_csv(
self,
table_id: str,
csv_file_path: str,
skip_leading_rows: int = 1,
write_disposition: str = "WRITE_APPEND",
field_delimiter: str = ",",
autodetect: bool = True,
schema: list[bigquery.SchemaField] = None
) -> bigquery.LoadJob:
"""
CSVファイルからテーブルにデータを挿入する
Args:
table_id (str): 挿入先のテーブルID(: 'dataset.table')
csv_file_path (str): 挿入元のCSVファイルパス GCSも対応可能(gs://bucket/path/to/file.csv)
skip_leading_rows (int): ヘッダ行をスキップする行数
write_disposition (str): 書き込みモード (: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY")
field_delimiter (str): フィールド区切り文字
autodetect (bool): スキーマ自動検出を有効にするかどうか
schema (list[bigquery.SchemaField]): スキーマ定義autodetect=Falseの場合に必要
Returns:
bigquery.LoadJob: ロードジョブオブジェクト
"""
try:
job_config = bigquery.LoadJobConfig(
skip_leading_rows=skip_leading_rows,
write_disposition=write_disposition,
field_delimiter=field_delimiter,
autodetect=autodetect,
schema=schema
)
with open(csv_file_path, "rb") as source_file:
load_job = self._client.load_table_from_file(
source_file,
table_id,
job_config=job_config
)
load_job.result() # ジョブの完了を待機
logger.info(f"Data loaded successfully from {csv_file_path} to table {table_id}.")
return load_job
except Exception as e:
logger.error(f"Failed to load data from {csv_file_path} to table {table_id}: {e}")
raise
def export_csv_to_gcs(
self,
table_id: str,
bucket: str,
prefix: str,
csv_file_name: str =None,
location: str = None
) -> bigquery.ExtractJob:
"""
テーブルからGCSのCSVファイルにデータをエクスポートする
Args:
table_id (str): エクスポート元のテーブルID(: 'dataset.table')
bucket (str): エクスポート先のGCSバケット名
prefix (str): エクスポート先のGCSプレフィックス(フォルダパス)
csv_file_name (str): エクスポート先のCSVファイル名Noneの場合は自動生成
location (str): ジョブのロケーション
Returns:
bigquery.ExtractJob: エクストラクトジョブオブジェクト
"""
try:
if not csv_file_name:
_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_file_name = f"{table_id.replace('.','_')}_{_timestamp}.csv"
destination_uri = f"gs://{bucket}/{prefix}/{csv_file_name}"
extract_job = self._client.extract_table(
table_id,
destination_uri,
location=location
)
extract_job.result() # ジョブの完了を待機
logger.info(f"Data exported successfully from table {table_id} to {destination_uri}.")
return extract_job
except Exception as e:
logger.error(f"Failed to export data from table {table_id} to {destination_uri}: {e}")
raise

View File

@ -10,6 +10,9 @@ from google.oauth2 import service_account
from lib.custom_logger import get_logger
logger = get_logger()
import zipfile
from pathlib import Path
class GoogleCloudStorageProvider:
@ -75,6 +78,17 @@ class GoogleCloudStorageProvider:
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
content_type: str | None = None) -> Dict[str, Any]:
"""
オブジェクトを書き込む
Args:
bucket (str): バケット名
object_name (str): オブジェクト名
data (Union[bytes, BinaryIO, str]): 書き込むデータ
content_type (Optional[str]): コンテンツタイプ(MIMEタイプ
Returns:
Dict[str, Any]: 書き込んだオブジェクトの情報
"""
blob = self._blob(bucket, object_name)
if content_type is None:
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
@ -100,4 +114,57 @@ class GoogleCloudStorageProvider:
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
expires: timedelta = timedelta(hours=1)) -> str:
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
def zip_items(
self,
bucket: str,
object_names: List[str],
) -> bytes:
"""
複数のGCSオブジェクトを1つのZIPにまとめZIPバイナリ(bytes)を返す
Args:
bucket (str): バケット名
object_names (List[str]): 対象オブジェクトのリスト
Returns:
bytes: ZIPファイルのバイナリ
"""
out = io.BytesIO()
with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for obj in object_names:
blob = self._blob(bucket, obj)
if not blob.exists():
raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}")
buf = io.BytesIO()
blob.download_to_file(buf)
buf.seek(0)
arcname = Path(obj).name
zf.writestr(arcname, buf.read())
zf.comment = f"bucket={bucket}, files={len(object_names)}".encode()
return out.getvalue()
def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""):
"""
ローカルフォルダをGCSに再帰的にアップロードする
Args:
bucket (str): バケット名
folder_path (str): ローカルフォルダのパス
gcs_prefix (str): GCS上のプレフィックス(フォルダパス)
"""
_bucket = self._bucket(bucket)
for root, _, files in os.walk(folder_path):
for file in files:
local_file_path = os.path.join(root, file)
# フォルダ構造を保つように相対パスを生成
relative_path = os.path.relpath(local_file_path, folder_path)
gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/")
blob = _bucket.blob(gcs_object_name)
blob.upload_from_filename(local_file_path)
logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}")

View File

@ -31,8 +31,8 @@ class OneDriveProvider:
client_id: Azure Portal のアプリ(公開クライアント) Client ID
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
scopes: ["Files.ReadWrite", "User.Read", "offline_access"]
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先任意
access_token: 既に取得済みの Bearer トークンを直接使いたい場合任意
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意
"""
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
self.authority = authority or self.DEFAULT_AUTHORITY
@ -64,7 +64,7 @@ class OneDriveProvider:
f.write(self._token_cache.serialize())
def ensure_token(self):
"""有効な Access Token を確保キャッシュ→デバイスコード)。"""
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
if self._access_token:
return self._access_token
@ -102,7 +102,7 @@ class OneDriveProvider:
"""
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用
先頭に / を付けずURL エンコードは requests 側に任せる前提で
空白や日本語は安全のため quote することを推奨今回は簡易化
空白や日本語は安全のため quote することを推奨(今回は簡易化
"""
if not path or path.strip() in ["/", "."]:
return ""
@ -156,7 +156,7 @@ class OneDriveProvider:
def create_folder(self, folder_path: str) -> Dict[str, Any]:
"""
中間フォルダも順次作成簡易実装
中間フォルダも順次作成(簡易実装
"""
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
cur = ""
@ -300,7 +300,7 @@ class OneDriveProvider:
return r.json()
# -----------------------
# 共有リンク擬似 Signed URL
# 共有リンク(擬似 Signed URL
# -----------------------
def generate_share_link(
self,
@ -316,7 +316,7 @@ class OneDriveProvider:
url = f"{self._item_by_path_url(path)}:/createLink"
body: Dict[str, Any] = {"type": link_type, "scope": scope}
if password:
body["password"] = password # パスワード保護ポリシーにより可否)
body["password"] = password # パスワード保護(ポリシーにより可否)
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
r.raise_for_status()
data:dict = r.json()

View File

@ -0,0 +1,41 @@
import feedparser
from feedparser import FeedParserDict
from pydantic import BaseModel
class Feed(BaseModel):
url: str
title: str | None = None
company: str | None = None
description: str | None = None
language: str | None = None
tags: list[str] | None = None
class RSSItem(BaseModel):
title: str
link: str
guid: str | None = None
published: str | None = None
ts: str | None = None
source: str | None = None
description: str | None = None
class RSSReaderClient:
"""RSSリーダークライアント"""
@classmethod
def fetch(cls, feed: Feed, from_date: str, to_date: str) -> list[RSSItem]:
"""指定されたフィードから記事を取得する"""
items = []
d: FeedParserDict = feedparser.parse(feed.url)
for e in d.entries:
items.append(RSSItem(
title=e.get("title", "(no title)"),
link=e.get("link"),
guid=e.get("id") or e.get("guid") or e.get("link"),
published=e.get("published") or e.get("updated"),
ts=e.get("published") or e.get("updated"),
source=feed.url,
description=e.get("summary") or e.get("description"),
))
return items