Compare commits
2 Commits
649015d016
...
f74b1c7a10
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f74b1c7a10 | ||
|
|
16e0d2d8a9 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -4,3 +4,6 @@ __pycache__
|
|||||||
|
|
||||||
.env
|
.env
|
||||||
keys/
|
keys/
|
||||||
|
|
||||||
|
|
||||||
|
.onedrive_cache.json
|
||||||
13
README.md
13
README.md
@ -4,3 +4,16 @@
|
|||||||
python -m venv venv
|
python -m venv venv
|
||||||
.\venv\Scripts\activate
|
.\venv\Scripts\activate
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
curl https://api.openai.com/v1/audio/transcriptions \
|
||||||
|
-H "Authorization: Bearer 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" \
|
||||||
|
-F "file=@part_001.m4a" \
|
||||||
|
-F "model=gpt-4o-transcribe" \
|
||||||
|
-F "response_format=text" \
|
||||||
|
-F "language=ja" \
|
||||||
|
-o part_001.txt
|
||||||
|
|
||||||
|
ffmpeg -i voice.m4a -f segment -segment_time 600 -c copy part_%03d.m4a
|
||||||
|
|
||||||
|
# フォーマットを揃える
|
||||||
|
|||||||
12
docments/big_query.md
Normal file
12
docments/big_query.md
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
## Big Queryは「データウェアハウス型」
|
||||||
|
|
||||||
|
Big QueryはOLAP(分析)用途の列指向データベースなので、
|
||||||
|
RDB(MySQL,PostgreSQLなど)とは違い、行の一意性や整合性制約を持たない設計です。
|
||||||
|
|
||||||
|
## BigQueryに存在しない制約
|
||||||
|
|
||||||
|
* PRIMARY KEY
|
||||||
|
* FOREIGN KEY
|
||||||
|
* UNIQUE
|
||||||
|
* CHECK
|
||||||
|
* AUTO_INCREMENT
|
||||||
66
docments/windows_disk.md
Normal file
66
docments/windows_disk.md
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
# 一時ファイルの削除
|
||||||
|
Remove-Item -Path "$env:TEMP\*" -Recurse -Force -ErrorAction SilentlyContinue
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
移動できるデータを他ドライブへ
|
||||||
|
|
||||||
|
|
||||||
|
```powershell
|
||||||
|
Get-ChildItem C:\ -Recurse -ErrorAction SilentlyContinue |
|
||||||
|
Where-Object { -not $_.PSIsContainer } |
|
||||||
|
Sort-Object Length -Descending |
|
||||||
|
Select-Object FullName, @{Name="Size(GB)";Expression={"{0:N2}" -f ($_.Length / 1GB)}} -First 20
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
サードパーティツールで整理
|
||||||
|
|
||||||
|
WinDirStat
|
||||||
|
→ ディスクの使用状況を可視化して、大きいファイルを一目で把握。
|
||||||
|
|
||||||
|
```
|
||||||
|
winget install WinDirStat
|
||||||
|
windirstat
|
||||||
|
```
|
||||||
|
|
||||||
|
CCleaner
|
||||||
|
→ 一時ファイルや不要レジストリの掃除に便利。
|
||||||
|
|
||||||
|
|
||||||
|
```
|
||||||
|
Remove-Item "C:\adobeTemp\*" -Recurse -Force -ErrorAction SilentlyContinue
|
||||||
|
|
||||||
|
|
||||||
|
ren "C:\adobeTemp" "adobeTemp_old"
|
||||||
|
```
|
||||||
|
|
||||||
|
attrib -h -s -r "C:\adobeTemp"
|
||||||
|
|
||||||
|
ren C:\adobeTemp C:\adobeTemp_old
|
||||||
|
ren "C:\adobeTemp" "adobeTemp_old"
|
||||||
|
|
||||||
|
mkdir "F:\adobeTemp"
|
||||||
|
|
||||||
|
mklink /J C:\adobeTemp F:\adobeTemp
|
||||||
|
|
||||||
|
|
||||||
|
ren C:\adobeTemp C:\adobeTemp_old
|
||||||
|
リンク先のフォルダを作成(Dドライブに新しい保存場所を用意)
|
||||||
|
|
||||||
|
powershell
|
||||||
|
Copy code
|
||||||
|
mkdir D:\adobeTemp
|
||||||
|
ジャンクションを作成
|
||||||
|
管理者権限のコマンドプロンプトで:
|
||||||
|
|
||||||
|
cmd
|
||||||
|
Copy code
|
||||||
|
mklink /J C:\adobeTemp D:\adobeTemp
|
||||||
|
|
||||||
|
|
||||||
|
cd "C:\Users\r_yam\AppData\Local\Docker\wsl\data"
|
||||||
|
optimize-vhd -Path .\ext4.vhdx -Mode Full
|
||||||
110
example/example_bigquery.py
Normal file
110
example/example_bigquery.py
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger(level=10)
|
||||||
|
|
||||||
|
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
||||||
|
|
||||||
|
|
||||||
|
def example_bigquery():
|
||||||
|
try:
|
||||||
|
# GoogleCloudBigQueryProviderのインスタンスを作成
|
||||||
|
provider = GoogleCloudBigQueryProvider(
|
||||||
|
cred_path="keys/google_service_accout.json",
|
||||||
|
)
|
||||||
|
|
||||||
|
dss = provider.get_datasets()
|
||||||
|
for ds in dss:
|
||||||
|
logger.info(f"Dataset ID: {ds.dataset_id}")
|
||||||
|
|
||||||
|
# データセットを作成する
|
||||||
|
dataset_id = "example_dataset"
|
||||||
|
if provider.is_exists_dataset(dataset_id):
|
||||||
|
logger.info(f"Dataset {dataset_id} already exists.")
|
||||||
|
else:
|
||||||
|
dataset = provider.create_dataset(dataset_id)
|
||||||
|
logger.info(f"Dataset {dataset_id} created at {dataset.created}.")
|
||||||
|
|
||||||
|
table_id = provider.full_table_id(dataset_id, "example_table")
|
||||||
|
# テーブルを作成する
|
||||||
|
if provider.is_exists_table(table_id):
|
||||||
|
logger.info(f"Table {table_id} already exists.")
|
||||||
|
else:
|
||||||
|
logger.info(f"Creating table {table_id}...")
|
||||||
|
schema = [
|
||||||
|
provider.addSchemaField("device_code", "string", "REQUIRED", description="Device code"),
|
||||||
|
provider.addSchemaField("time_stamp", "timestamp", "REQUIRED", description="Timestamp"),
|
||||||
|
]
|
||||||
|
table = provider.create_table(
|
||||||
|
table_id=table_id,
|
||||||
|
schema=schema
|
||||||
|
)
|
||||||
|
|
||||||
|
# tables = provider.get_tables(dataset_id)
|
||||||
|
# for table in tables:
|
||||||
|
# logger.info(f"Table ID: {table.table_id}")
|
||||||
|
|
||||||
|
# テーブル情報を挿入する
|
||||||
|
# provider.insert_rows(
|
||||||
|
# table_id=table_id,
|
||||||
|
# rows=[
|
||||||
|
# {"device_code": "device_001", "time_stamp": "2025-01-01 12:00:00"},
|
||||||
|
# {"device_code": "device_002", "time_stamp": "2025-01-01 12:05:00"},
|
||||||
|
# {"device_code": "device_003", "time_stamp": "2025-01-01 12:10:00"},
|
||||||
|
# ]
|
||||||
|
# )
|
||||||
|
# テーブル情報を確認する(JOBあり・SQL発行)
|
||||||
|
# job_qyuery = provider.excute_query(
|
||||||
|
# query=f"SELECT * FROM `{table_id}` where device_code='device_001'",
|
||||||
|
# )
|
||||||
|
# results = job_qyuery.result()
|
||||||
|
# for row in results:
|
||||||
|
# logger.info(f"Row: {row}")
|
||||||
|
|
||||||
|
|
||||||
|
# テーブル情報を確認する(JOBあり)
|
||||||
|
# rows = provider.list_rows(
|
||||||
|
# table_id=table_id,
|
||||||
|
# )
|
||||||
|
# for row in rows:
|
||||||
|
# logger.info(f"Row: {row}")
|
||||||
|
|
||||||
|
# バッファ状況の確認
|
||||||
|
# buffer_status = provider.get_streaming_buffer_info(table_id=table_id)
|
||||||
|
# logger.info(f"Streaming Buffer Info: {buffer_status}")
|
||||||
|
# テーブルを更新する
|
||||||
|
# query_job = provider.update_query(
|
||||||
|
# table_id=table_id,
|
||||||
|
# values={
|
||||||
|
# "device_code": "'device_999'"
|
||||||
|
# },
|
||||||
|
# were_clause="device_code='device_002'"
|
||||||
|
# )
|
||||||
|
# query_job.result() # ジョブの完了を待機
|
||||||
|
|
||||||
|
|
||||||
|
# # テーブル情報を確認する(JOBあり)
|
||||||
|
# job = provider.select_query(
|
||||||
|
# table_id=table_id,
|
||||||
|
# )
|
||||||
|
# results = job.result()
|
||||||
|
# for row in results:
|
||||||
|
# logger.info(f"Row: {row}")
|
||||||
|
|
||||||
|
# # テーブルのレコードを削除する
|
||||||
|
# provider.delete_query(
|
||||||
|
# table_id=table_id,
|
||||||
|
# were_clause="device_code='device_012'"
|
||||||
|
# )
|
||||||
|
# テーブルを削除する
|
||||||
|
# provider.delete_table(table_id=table_id)
|
||||||
|
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in example_bigquery: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
example_bigquery()
|
||||||
74
example/example_bigquery_model.py
Normal file
74
example/example_bigquery_model.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
||||||
|
from typing import ClassVar, Optional
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger(level=10)
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from models.bigquery_base_model import BigQueryBaseModel
|
||||||
|
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ExamplModel(BigQueryBaseModel):
|
||||||
|
device_code: str
|
||||||
|
time_stamp: datetime
|
||||||
|
|
||||||
|
table_id: ClassVar[str] = "example_dataset.example_table"
|
||||||
|
key_field: ClassVar[Optional[str]] = "device_code"
|
||||||
|
|
||||||
|
def example_model():
|
||||||
|
logger.info("Starting example_bigquery_model function.")
|
||||||
|
provider = GoogleCloudBigQueryProvider(
|
||||||
|
cred_path="keys/google_service_accout.json",
|
||||||
|
)
|
||||||
|
ExamplModel.set_provider(provider)
|
||||||
|
|
||||||
|
# テーブル内の全レコードを取得する
|
||||||
|
# records = ExamplModel.fetch_all()
|
||||||
|
# logger.info(f"Total records: {len(records)}")
|
||||||
|
# for record in records:
|
||||||
|
# logger.info(f"Record: {record}")
|
||||||
|
|
||||||
|
# レコードを生成する
|
||||||
|
# new_record = ExamplModel(
|
||||||
|
# device_code="device_010",
|
||||||
|
# time_stamp=datetime(2025, 1, 1, 15, 0, 0)
|
||||||
|
# )
|
||||||
|
# new_record.create()
|
||||||
|
|
||||||
|
# レコードを大量に生成する
|
||||||
|
# ExamplModel.insert(
|
||||||
|
# rows=[
|
||||||
|
# ExamplModel(device_code="device_011",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
|
||||||
|
# ExamplModel(device_code="device_012",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
|
||||||
|
# ])
|
||||||
|
|
||||||
|
# テーブルのストリームバッファ情報を取得する
|
||||||
|
if ExamplModel.is_streaming_buffer():
|
||||||
|
logger.info("Table is currently receiving streaming data.")
|
||||||
|
logger.info("not updated or delete in the streaming buffer.")
|
||||||
|
else:
|
||||||
|
logger.info("Table is not receiving streaming data.")
|
||||||
|
# 特定の条件でレコードを削除する
|
||||||
|
# ExamplModel.delete(where=[("device_code", "=", "device_010")])
|
||||||
|
# レコードを更新する
|
||||||
|
ExamplModel.update(
|
||||||
|
values={"device_code": "device_011_updated"},
|
||||||
|
where=[("device_code", "=", "device_011")]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# records = ExamplModel.list(where=[("device_code", "=", "device_001")])
|
||||||
|
# logger.info(f"Records with device_code='device_001': {records if records else 'No records found'}")
|
||||||
|
|
||||||
|
|
||||||
|
# record = ExamplModel.first()
|
||||||
|
# logger.info(f"First record: {record if record else 'No record found'}")
|
||||||
|
|
||||||
|
|
||||||
|
example_model()
|
||||||
25
example/example_duck_db.py
Normal file
25
example/example_duck_db.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger(level=10)
|
||||||
|
|
||||||
|
from providers.duck_db_provider import DuckDBProvider
|
||||||
|
|
||||||
|
def example_duckdb():
|
||||||
|
logger.info("Starting example_duckdb function.")
|
||||||
|
file_path = "./data_science/data/y=*/news/news_*.csv"
|
||||||
|
provider = DuckDBProvider()
|
||||||
|
sql = f"""
|
||||||
|
SELECT *
|
||||||
|
FROM read_csv_auto('{file_path}' , HEADER=TRUE, IGNORE_ERRORS=TRUE)
|
||||||
|
"""
|
||||||
|
result = provider.query_df(sql)
|
||||||
|
print("latest published_parsed:", result)
|
||||||
|
|
||||||
|
# forで1件ずつ表示
|
||||||
|
for idx, row in result.iterrows():
|
||||||
|
logger.info(f"title:{row['title']}")
|
||||||
|
|
||||||
|
example_duckdb()
|
||||||
87
example/example_duck_gcs.py
Normal file
87
example/example_duck_gcs.py
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
"""
|
||||||
|
HMACキーがひつようになる
|
||||||
|
* Google Cloud Consoleで発行する
|
||||||
|
* https://console.cloud.google.com/storage/settings
|
||||||
|
* 「相互運用性(Interoperability)」タブを開く
|
||||||
|
* 「HMACキーを作成」ボタンを押す
|
||||||
|
* 使いたいサービスアカウントを選択
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger(level=10)
|
||||||
|
|
||||||
|
import duckdb
|
||||||
|
from providers.google_cloud_storage_provider import GoogleCloudStorageProvider
|
||||||
|
from providers.duck_db_provider import DuckDBProvider
|
||||||
|
|
||||||
|
|
||||||
|
def example_init_google_cloud_storage():
|
||||||
|
|
||||||
|
logger.info("Starting example_google_cloud_storage function.")
|
||||||
|
gcs_provider = GoogleCloudStorageProvider(
|
||||||
|
cred_path="./keys/google_service_accout.json",
|
||||||
|
)
|
||||||
|
csv_data = """id,name,age,city,score,created_at
|
||||||
|
1,Alice,25,Tokyo,88,2025-10-01T09:00:00Z
|
||||||
|
2,Bob,30,Osaka,75,2025-10-02T09:30:00Z
|
||||||
|
3,Charlie,28,Nagoya,92,2025-10-03T10:00:00Z
|
||||||
|
4,David,35,Fukuoka,64,2025-10-04T11:15:00Z
|
||||||
|
5,Eva,22,Sapporo,80,2025-10-05T12:45:00Z
|
||||||
|
"""
|
||||||
|
gcs_provider.write_item("datasource-example-251018",
|
||||||
|
"example/y=2025/m=10/example.csv",
|
||||||
|
csv_data.encode("utf-8"),"text/csv")
|
||||||
|
|
||||||
|
buckets = gcs_provider.get_buckets()
|
||||||
|
logger.info(f"Buckets: {buckets}")
|
||||||
|
|
||||||
|
|
||||||
|
def example_duckdb_cloud_raw():
|
||||||
|
logger.info("Starting example_duckdb_cloud_raw function.")
|
||||||
|
|
||||||
|
# DuckDB接続
|
||||||
|
con = duckdb.connect()
|
||||||
|
con.sql(f"""
|
||||||
|
CREATE OR REPLACE SECRET gcs_creds (
|
||||||
|
TYPE gcs,
|
||||||
|
KEY_ID {os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY')},
|
||||||
|
SECRET {os.getenv('GCP_STORAGE_HMAC_SECRET_KEY')}
|
||||||
|
);
|
||||||
|
""")
|
||||||
|
|
||||||
|
query = f"""
|
||||||
|
SELECT * FROM read_csv_auto('gs://datasource-example-251018/example/y=2025/m=10/example.csv');
|
||||||
|
"""
|
||||||
|
result = con.execute(query).df()
|
||||||
|
logger.info(f"Read {len(result)} rows from GCS file.")
|
||||||
|
|
||||||
|
|
||||||
|
def example_duckdb_cloud_class():
|
||||||
|
logger.info("Starting example_duckdb_cloud_class function.")
|
||||||
|
# DuckDB接続
|
||||||
|
provider = DuckDBProvider()
|
||||||
|
provider.setup_gcs(
|
||||||
|
access_key=os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY'),
|
||||||
|
secret_key=os.getenv('GCP_STORAGE_HMAC_SECRET_KEY'),
|
||||||
|
)
|
||||||
|
bucket_name = "datasource-example-251018"
|
||||||
|
object_name = "example/y=2025/m=*/example.csv"
|
||||||
|
|
||||||
|
query = f"""
|
||||||
|
SELECT * FROM {provider.get_gs_csv_name(bucket_name, object_name)};
|
||||||
|
"""
|
||||||
|
result = provider.query_df(query)
|
||||||
|
logger.info(f"Read {len(result)} rows from GCS file using DuckDBProvider.")
|
||||||
|
|
||||||
|
|
||||||
|
# example_init_google_cloud_storage()
|
||||||
|
# example_duckdb_cloud_raw()
|
||||||
|
example_duckdb_cloud_class()
|
||||||
30
example/example_duck_model.py
Normal file
30
example/example_duck_model.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
import sys
|
||||||
|
import os
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
||||||
|
|
||||||
|
from typing import ClassVar, Optional
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger(level=10)
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from models.duck_base_model import DuckBaseModel
|
||||||
|
from providers.duck_db_provider import DuckDBProvider
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NewsModel(DuckBaseModel):
|
||||||
|
title: str
|
||||||
|
file_glob: ClassVar[str] = "./data_science/data/y=2025/news/news_*.csv"
|
||||||
|
default_order_by: ClassVar[Optional[str]] = "pub_date DESC"
|
||||||
|
|
||||||
|
|
||||||
|
def example_duckdb_model():
|
||||||
|
logger.info("Starting example_duckdb_model function.")
|
||||||
|
provider = DuckDBProvider()
|
||||||
|
provider.connect()
|
||||||
|
NewsModel.provider = provider
|
||||||
|
|
||||||
|
news = NewsModel.first()
|
||||||
|
logger.info(f"Total news: {news if news else 'No news found'}")
|
||||||
|
|
||||||
|
# example_duckdb_model()
|
||||||
47
example/example_rss.py
Normal file
47
example/example_rss.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
# openai_rss_reader_stdlib.py
|
||||||
|
# pip install feedparser
|
||||||
|
|
||||||
|
import feedparser
|
||||||
|
from email.utils import parsedate_to_datetime
|
||||||
|
|
||||||
|
FEEDS = [
|
||||||
|
"https://platform.openai.com/docs/release-notes.rss", # リリースノート
|
||||||
|
"https://openai.com/blog/rss.xml", # ブログ
|
||||||
|
]
|
||||||
|
|
||||||
|
def to_ts(entry: feedparser.FeedParserDict):
|
||||||
|
# published_parsed / updated_parsed があれば直接datetime化
|
||||||
|
if entry.get("published_parsed"):
|
||||||
|
return parsedate_to_datetime(entry.published)
|
||||||
|
if entry.get("updated_parsed"):
|
||||||
|
return parsedate_to_datetime(entry.updated)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def uniq_key(entry: feedparser.FeedParserDict):
|
||||||
|
return entry.get("id") or entry.get("guid") or entry.get("link")
|
||||||
|
|
||||||
|
def fetch_all(feeds):
|
||||||
|
items = []
|
||||||
|
seen = set()
|
||||||
|
for url in feeds:
|
||||||
|
d: feedparser.FeedParserDict = feedparser.parse(url)
|
||||||
|
for e in d.entries:
|
||||||
|
k = uniq_key(e)
|
||||||
|
if not k or k in seen:
|
||||||
|
continue
|
||||||
|
seen.add(k)
|
||||||
|
ts = to_ts(e)
|
||||||
|
items.append({
|
||||||
|
"title": e.get("title", "(no title)"),
|
||||||
|
"link": e.get("link"),
|
||||||
|
"published": e.get("published") or e.get("updated"),
|
||||||
|
"ts": ts,
|
||||||
|
"source": url,
|
||||||
|
})
|
||||||
|
# ts が None のものは末尾に回す
|
||||||
|
items.sort(key=lambda x: (x["ts"] is not None, x["ts"]), reverse=True)
|
||||||
|
return items
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
for i, it in enumerate(fetch_all(FEEDS), 1):
|
||||||
|
print(f"{i:02d}. {it['title']}\n {it['link']}\n {it['published']} [{it['source']}]\n")
|
||||||
@ -2,13 +2,26 @@ matplotlib
|
|||||||
requests
|
requests
|
||||||
pyttsx3
|
pyttsx3
|
||||||
|
|
||||||
|
# pylib
|
||||||
|
pandas
|
||||||
# firebase_provider
|
# firebase_provider
|
||||||
firebase-admin>=7.1.0
|
firebase-admin>=7.1.0
|
||||||
# google cloud storage
|
# google cloud storage
|
||||||
google-cloud-storage
|
google-cloud-storage
|
||||||
|
# google cloud big query
|
||||||
|
google-cloud-bigquery
|
||||||
|
# google cloud big query option
|
||||||
|
google-cloud-bigquery-storage
|
||||||
|
# bigquery to dataframe
|
||||||
|
db-dtypes
|
||||||
# onedrive
|
# onedrive
|
||||||
msal
|
msal
|
||||||
|
|
||||||
# common
|
# common
|
||||||
python-dotenv
|
python-dotenv
|
||||||
|
# RSS
|
||||||
|
feedparser
|
||||||
|
# model
|
||||||
|
pydantic
|
||||||
|
# duckdb
|
||||||
|
duckdb==1.3.2
|
||||||
@ -28,7 +28,6 @@ class CustomLogger(Singleton):
|
|||||||
|
|
||||||
def __init__(self, name='main', log_file=None, level=logging.INFO):
|
def __init__(self, name='main', log_file=None, level=logging.INFO):
|
||||||
if hasattr(self, '_initialized') and self._initialized:
|
if hasattr(self, '_initialized') and self._initialized:
|
||||||
self.logger.setLevel(level)
|
|
||||||
return # すでに初期化済みなら何もしない
|
return # すでに初期化済みなら何もしない
|
||||||
|
|
||||||
self.logger = logging.getLogger(name)
|
self.logger = logging.getLogger(name)
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
This implementation is thread-safe and ensures that only one instance of the class is created.
|
This implementation is thread-safe and ensures that only one instance of the class is created.
|
||||||
|
|
||||||
Singleton が提供するのは「同じインスタンスを返す仕組み」
|
Singleton が提供するのは「同じインスタンスを返す仕組み」
|
||||||
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
|
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
|
||||||
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。
|
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|||||||
@ -14,7 +14,7 @@ class TaskScheduler:
|
|||||||
# インスタンスを作成し、スレッドを起動
|
# インスタンスを作成し、スレッドを起動
|
||||||
scheduler = TaskScheduler()
|
scheduler = TaskScheduler()
|
||||||
|
|
||||||
# インスタンスを破棄(しかしスレッドは続行)
|
# インスタンスを破棄(しかしスレッドは続行)
|
||||||
# del scheduler
|
# del scheduler
|
||||||
|
|
||||||
# メインスレッドは他の作業を続ける
|
# メインスレッドは他の作業を続ける
|
||||||
|
|||||||
510
src/models/bigquery_base_model.py
Normal file
510
src/models/bigquery_base_model.py
Normal file
@ -0,0 +1,510 @@
|
|||||||
|
import os
|
||||||
|
import base64
|
||||||
|
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
|
||||||
|
from enum import Enum
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
|
||||||
|
from datetime import datetime, date, time, timezone
|
||||||
|
from dataclasses import dataclass, fields, is_dataclass
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger()
|
||||||
|
|
||||||
|
T = TypeVar("T", bound="BigQueryBaseModel")
|
||||||
|
|
||||||
|
Filter = Tuple[str, str, Any]
|
||||||
|
|
||||||
|
def _build_where_clause_bq_without_prams(
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
|
||||||
|
)->str:
|
||||||
|
"""
|
||||||
|
BigQuery 向け WHERE 句の生成(パラメータなし)
|
||||||
|
Returns:
|
||||||
|
where_sql
|
||||||
|
"""
|
||||||
|
if where is None:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
clauses = []
|
||||||
|
|
||||||
|
if isinstance(where, str):
|
||||||
|
# 文字列のまま(完全SQL指定)
|
||||||
|
return f"WHERE {where}"
|
||||||
|
|
||||||
|
if isinstance(where, dict):
|
||||||
|
for col, val in where.items():
|
||||||
|
if isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
|
||||||
|
clauses.append(f"{col} IN ({placeholders})")
|
||||||
|
elif val is None:
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} = '{str(val).replace('\'', '\\\'')}'")
|
||||||
|
else:
|
||||||
|
# sequence of (col, op, val)
|
||||||
|
for col, op, val in where:
|
||||||
|
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
|
||||||
|
clauses.append(f"{col} IN ({placeholders})")
|
||||||
|
elif val is None and op in ("=", "=="):
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} {op} '{str(val).replace('\'', '\\\'')}'")
|
||||||
|
|
||||||
|
where_sql = "WHERE " + " AND ".join(clauses)
|
||||||
|
return where_sql
|
||||||
|
|
||||||
|
def _build_where_clause_bq(
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
|
||||||
|
) -> Tuple[str, List[Dict[str, Any]]]:
|
||||||
|
"""
|
||||||
|
BigQuery 向け WHERE 句とパラメータの生成
|
||||||
|
Returns:
|
||||||
|
(where_sql, query_params)
|
||||||
|
"""
|
||||||
|
if where is None:
|
||||||
|
return "", []
|
||||||
|
|
||||||
|
params = []
|
||||||
|
clauses = []
|
||||||
|
|
||||||
|
if isinstance(where, str):
|
||||||
|
# 文字列のまま(完全SQL指定)
|
||||||
|
return f"WHERE {where}", params
|
||||||
|
|
||||||
|
if isinstance(where, dict):
|
||||||
|
for i, (col, val) in enumerate(where.items()):
|
||||||
|
param_name = f"param{i}"
|
||||||
|
if isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = []
|
||||||
|
for j, v in enumerate(val):
|
||||||
|
pname = f"{param_name}_{j}"
|
||||||
|
placeholders.append(f"@{pname}")
|
||||||
|
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
|
||||||
|
clauses.append(f"{col} IN ({', '.join(placeholders)})")
|
||||||
|
elif val is None:
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} = @{param_name}")
|
||||||
|
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
|
||||||
|
else:
|
||||||
|
# sequence of (col, op, val)
|
||||||
|
for i, (col, op, val) in enumerate(where):
|
||||||
|
param_name = f"param{i}"
|
||||||
|
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = []
|
||||||
|
for j, v in enumerate(val):
|
||||||
|
pname = f"{param_name}_{j}"
|
||||||
|
placeholders.append(f"@{pname}")
|
||||||
|
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
|
||||||
|
clauses.append(f"{col} IN ({', '.join(placeholders)})")
|
||||||
|
elif val is None and op in ("=", "=="):
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} {op} @{param_name}")
|
||||||
|
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
|
||||||
|
|
||||||
|
where_sql = "WHERE " + " AND ".join(clauses)
|
||||||
|
return where_sql, params
|
||||||
|
|
||||||
|
|
||||||
|
def to_bq_json(v):
|
||||||
|
if isinstance(v, datetime):
|
||||||
|
# TIMESTAMP 用(UTCに正規化して Z を付ける)
|
||||||
|
if v.tzinfo is None:
|
||||||
|
v = v.replace(tzinfo=timezone.utc)
|
||||||
|
return v.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||||
|
if isinstance(v, date) and not isinstance(v, datetime):
|
||||||
|
# DATE 用
|
||||||
|
return v.isoformat()
|
||||||
|
if isinstance(v, time):
|
||||||
|
# TIME 用
|
||||||
|
# v.isoformat() は microseconds を含むことあり → そのままでOK
|
||||||
|
return v.isoformat()
|
||||||
|
if isinstance(v, Decimal):
|
||||||
|
# NUMERIC/BIGNUMERIC に文字列で渡すのが安全
|
||||||
|
return str(v)
|
||||||
|
if isinstance(v, bytes):
|
||||||
|
# BYTES は base64 文字列
|
||||||
|
return base64.b64encode(v).decode("ascii")
|
||||||
|
if isinstance(v, Enum):
|
||||||
|
return v.value
|
||||||
|
if isinstance(v, list):
|
||||||
|
return [to_bq_json(x) for x in v]
|
||||||
|
if isinstance(v, dict):
|
||||||
|
return {k: to_bq_json(val) for k, val in v.items()}
|
||||||
|
return v
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BigQueryBaseModel:
|
||||||
|
|
||||||
|
table_id: ClassVar[str] # 例: "datasetid.table_name"
|
||||||
|
provider: ClassVar[GoogleCloudBigQueryProvider | None] = None
|
||||||
|
key_field: ClassVar[Optional[str]] = None # 例: "id" 主キーに相当するフィールド名
|
||||||
|
columns_select: ClassVar[str] = "*" # 例: "col1, col2, col3"
|
||||||
|
default_order_by: ClassVar[Optional[str]] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def set_provider(cls, provider: GoogleCloudBigQueryProvider):
|
||||||
|
cls.provider = provider
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fetch_all(cls: Type[T]) -> List[T]:
|
||||||
|
"""テーブル内の全レコードを取得する"""
|
||||||
|
if cls.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
rows = cls.provider.list_rows(table_id=cls.table_id)
|
||||||
|
result: List[T] = []
|
||||||
|
for row in rows:
|
||||||
|
data = dict(row)
|
||||||
|
obj: T = cls(**data) # type: ignore[arg-type]
|
||||||
|
result.append(obj)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def create(self):
|
||||||
|
"""レコードをテーブルに挿入する"""
|
||||||
|
if self.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
data = {f.name: getattr(self, f.name) for f in fields(self)}
|
||||||
|
data = to_bq_json(data)
|
||||||
|
|
||||||
|
logger.debug(f"Inserting data: {data}")
|
||||||
|
self.provider.insert_rows(
|
||||||
|
table_id=self.table_id,
|
||||||
|
rows=[data]
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def insert(cls, rows: List[T]):
|
||||||
|
"""複数レコードをテーブルに挿入する"""
|
||||||
|
if cls.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
bq_rows = []
|
||||||
|
for row in rows:
|
||||||
|
data = {f.name: getattr(row, f.name) for f in fields(row)}
|
||||||
|
bq_rows.append(to_bq_json(data))
|
||||||
|
|
||||||
|
logger.info(f"Inserting data: {len(bq_rows)}")
|
||||||
|
logger.debug(f"Inserting data details: {bq_rows[:10]}...")
|
||||||
|
cls.provider.insert_rows(
|
||||||
|
table_id=cls.table_id,
|
||||||
|
rows=bq_rows
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_streaming_buffer(cls) -> bool:
|
||||||
|
"""テーブルのストリームバッファ情報を取得する"""
|
||||||
|
if cls.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
buf_info = cls.provider.get_streaming_buffer_info(table_id=cls.table_id)
|
||||||
|
logger.debug(f"Streaming buffer info: {buf_info}")
|
||||||
|
|
||||||
|
return buf_info is not None
|
||||||
|
|
||||||
|
def save(self):
|
||||||
|
"""レコードをテーブルに更新する"""
|
||||||
|
if self.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
if self.is_streaming_buffer():
|
||||||
|
raise RuntimeError("Cannot update records in streaming buffer.")
|
||||||
|
|
||||||
|
if self.key_field is None:
|
||||||
|
raise RuntimeError("key_field is not set.")
|
||||||
|
|
||||||
|
key_value = getattr(self, self.key_field)
|
||||||
|
if key_value is None:
|
||||||
|
raise ValueError(f"Key field '{self.key_field}' value is None.")
|
||||||
|
|
||||||
|
data = {f.name: getattr(self, f.name) for f in fields(self)}
|
||||||
|
data = to_bq_json(data)
|
||||||
|
|
||||||
|
set_clauses = []
|
||||||
|
for k, v in data.items():
|
||||||
|
if k == self.key_field:
|
||||||
|
continue
|
||||||
|
if isinstance(v, str):
|
||||||
|
v_str = f"'{v.replace('\'', '\\\'')}'"
|
||||||
|
else:
|
||||||
|
v_str = str(v)
|
||||||
|
set_clauses.append(f"{k}={v_str}")
|
||||||
|
set_clause = ", ".join(set_clauses)
|
||||||
|
|
||||||
|
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
|
||||||
|
|
||||||
|
logger.debug(f"Updating data: SET {set_clause} WHERE {where_clause}")
|
||||||
|
self.provider.update_query(
|
||||||
|
table_id=self.table_id,
|
||||||
|
values=set_clause,
|
||||||
|
were_clause=where_clause
|
||||||
|
)
|
||||||
|
|
||||||
|
def destroy(self):
|
||||||
|
"""レコードをテーブルから削除する"""
|
||||||
|
if self.provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
if self.is_streaming_buffer():
|
||||||
|
raise RuntimeError("Cannot delete records in streaming buffer.")
|
||||||
|
|
||||||
|
if self.key_field is None:
|
||||||
|
raise RuntimeError("key_field is not set.")
|
||||||
|
|
||||||
|
key_value = getattr(self, self.key_field)
|
||||||
|
if key_value is None:
|
||||||
|
raise ValueError(f"Key field '{self.key_field}' value is None.")
|
||||||
|
|
||||||
|
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
|
||||||
|
|
||||||
|
logger.debug(f"Deleting data: WHERE {where_clause}")
|
||||||
|
self.provider.delete_query(
|
||||||
|
table_id=self.table_id,
|
||||||
|
were_clause=where_clause
|
||||||
|
)
|
||||||
|
|
||||||
|
# -------------------------------------
|
||||||
|
# select
|
||||||
|
@classmethod
|
||||||
|
def select(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: Optional[int] = None,
|
||||||
|
):
|
||||||
|
params_dict = []
|
||||||
|
where_sql, params_dict = _build_where_clause_bq(where)
|
||||||
|
# where_sql = _build_where_clause_bq_without_prams(where)
|
||||||
|
order = order_by or cls.default_order_by
|
||||||
|
order_sql = f"ORDER BY {order}" if order else ""
|
||||||
|
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
|
||||||
|
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
|
||||||
|
sql = f"""
|
||||||
|
SELECT {cls.columns_select}
|
||||||
|
FROM `{cls.table_id}`
|
||||||
|
{where_sql}
|
||||||
|
{order_sql}
|
||||||
|
{limit_sql}
|
||||||
|
{offset_sql}
|
||||||
|
"""
|
||||||
|
# logger.debug(f"Select Query: {sql} with params_dict: {params_dict}")
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
if provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
params = []
|
||||||
|
for p in params_dict:
|
||||||
|
param = provider.addQueryParameter(
|
||||||
|
name=p["name"],
|
||||||
|
value=p["parameterValue"]["value"],
|
||||||
|
param_type=p["parameterType"]["type"]
|
||||||
|
)
|
||||||
|
params.append(param)
|
||||||
|
# パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用
|
||||||
|
# query_job = provider.excute_query(sql, params=params)
|
||||||
|
query_job = provider.excute_query(sql, params=params)
|
||||||
|
return query_job
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def select_df(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: Optional[int] = None,
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
"""DataFrame で取得
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
Google Cloud BigQuery から直接 DataFrame を取得するには db-dtypes パッケージが必要
|
||||||
|
"""
|
||||||
|
query_job = cls.select(
|
||||||
|
provider=provider,
|
||||||
|
where=where,
|
||||||
|
order_by=order_by,
|
||||||
|
limit=limit,
|
||||||
|
offset=offset,
|
||||||
|
)
|
||||||
|
return query_job.to_dataframe()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def list(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: Optional[int] = None,
|
||||||
|
) -> List[T]:
|
||||||
|
job = cls.select(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
|
||||||
|
results = job.result()
|
||||||
|
instances: List[T] = []
|
||||||
|
for row in results:
|
||||||
|
data = dict(row)
|
||||||
|
instance = cls(**data) # type: ignore
|
||||||
|
instances.append(instance)
|
||||||
|
|
||||||
|
# df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
|
||||||
|
# if not is_dataclass(cls):
|
||||||
|
# raise TypeError(f"{cls.__name__} is not a dataclass.")
|
||||||
|
# instances: List[T] = []
|
||||||
|
# for _, row in df.iterrows():
|
||||||
|
# data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
|
||||||
|
# instance = cls(**data) # type: ignore
|
||||||
|
# instances.append(instance)
|
||||||
|
return instances
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def first(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
) -> Optional[T]:
|
||||||
|
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
|
||||||
|
return results[0] if results else None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def count(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
) -> int:
|
||||||
|
where_sql, params_dict = _build_where_clause_bq(where)
|
||||||
|
sql = f"""
|
||||||
|
SELECT COUNT(*) as cnt
|
||||||
|
FROM `{cls.table_id}`
|
||||||
|
{where_sql}
|
||||||
|
"""
|
||||||
|
# パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
if provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
params = []
|
||||||
|
for p in params_dict:
|
||||||
|
param = provider.addQueryParameter(
|
||||||
|
name=p["name"],
|
||||||
|
value=p["parameterValue"]["value"],
|
||||||
|
param_type=p["parameterType"]["type"]
|
||||||
|
)
|
||||||
|
params.append(param)
|
||||||
|
|
||||||
|
provider.excute_query(
|
||||||
|
sql,
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
|
||||||
|
query_job = provider.excute_query(sql, params=params)
|
||||||
|
results = query_job.result()
|
||||||
|
for row in results:
|
||||||
|
return row["cnt"]
|
||||||
|
return 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def exists(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
) -> bool:
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
return cls.count(provider, where=where) > 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def delete(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
):
|
||||||
|
where_sql, params_dict = _build_where_clause_bq(where)
|
||||||
|
sql = f"""
|
||||||
|
DELETE FROM `{cls.table_id}`
|
||||||
|
{where_sql}
|
||||||
|
"""
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
if provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
params = []
|
||||||
|
for p in params_dict:
|
||||||
|
param = provider.addQueryParameter(
|
||||||
|
name=p["name"],
|
||||||
|
value=p["parameterValue"]["value"],
|
||||||
|
param_type=p["parameterType"]["type"]
|
||||||
|
)
|
||||||
|
params.append(param)
|
||||||
|
|
||||||
|
job = provider.excute_query(
|
||||||
|
sql,
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
job.result() # ジョブの完了を待機
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def update(
|
||||||
|
cls: Type[T],
|
||||||
|
values: dict[str, Any],
|
||||||
|
*,
|
||||||
|
where: Union[str, Dict[str, Any], Sequence[Filter]] ,
|
||||||
|
provider: GoogleCloudBigQueryProvider = None,
|
||||||
|
):
|
||||||
|
params_dicts = []
|
||||||
|
where_sql, params_dicts = _build_where_clause_bq(where)
|
||||||
|
set_clauses = []
|
||||||
|
for k, v in values.items():
|
||||||
|
param_name = f"set_{k}"
|
||||||
|
set_clauses.append(f"{k}=@{param_name}")
|
||||||
|
params_dicts.append({
|
||||||
|
"name": param_name,
|
||||||
|
"parameterType": {"type": "STRING"},
|
||||||
|
"parameterValue": {"value": str(v)}
|
||||||
|
})
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
UPDATE `{cls.table_id}`
|
||||||
|
SET {', '.join(set_clauses)}
|
||||||
|
{where_sql}
|
||||||
|
"""
|
||||||
|
logger.debug(f"update Query: {sql} with params_dict: {params_dicts}")
|
||||||
|
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
if provider is None:
|
||||||
|
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
||||||
|
|
||||||
|
params = []
|
||||||
|
for p in params_dicts:
|
||||||
|
param = provider.addQueryParameter(
|
||||||
|
name=p["name"],
|
||||||
|
value=p["parameterValue"]["value"],
|
||||||
|
param_type=p["parameterType"]["type"]
|
||||||
|
)
|
||||||
|
params.append(param)
|
||||||
|
|
||||||
|
job = provider.excute_query(
|
||||||
|
sql,
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
job.result() # ジョブの完了を待機
|
||||||
184
src/models/duck_base_model.py
Normal file
184
src/models/duck_base_model.py
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
import os
|
||||||
|
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
|
||||||
|
from dataclasses import dataclass, fields, is_dataclass
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
from providers.duck_db_provider import DuckDBProvider
|
||||||
|
|
||||||
|
DATA_BASEPATH=os.getenv("DATA_BASEPATH","./data")
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger()
|
||||||
|
|
||||||
|
T = TypeVar("T", bound="DuckBaseModel")
|
||||||
|
|
||||||
|
Filter = Tuple[str, str, Any]
|
||||||
|
|
||||||
|
def _build_where_clause(where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]]
|
||||||
|
) -> Tuple[str, List[Any]]:
|
||||||
|
if where is None:
|
||||||
|
return "", []
|
||||||
|
params: List[Any] = []
|
||||||
|
|
||||||
|
if isinstance(where, str):
|
||||||
|
return f"WHERE {where}", params
|
||||||
|
|
||||||
|
clauses: List[str] = []
|
||||||
|
if isinstance(where, dict):
|
||||||
|
for col, val in where.items():
|
||||||
|
if isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = ", ".join(["?"] * len(val))
|
||||||
|
clauses.append(f"{col} IN ({placeholders})")
|
||||||
|
params.extend(list(val))
|
||||||
|
elif val is None:
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} = ?")
|
||||||
|
params.append(val)
|
||||||
|
else:
|
||||||
|
# sequence of (col, op, value)
|
||||||
|
for col, op, val in where:
|
||||||
|
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
||||||
|
placeholders = ", ".join(["?"] * len(val))
|
||||||
|
clauses.append(f"{col} IN ({placeholders})")
|
||||||
|
params.extend(list(val))
|
||||||
|
elif val is None and op in ("=", "=="):
|
||||||
|
clauses.append(f"{col} IS NULL")
|
||||||
|
else:
|
||||||
|
clauses.append(f"{col} {op} ?")
|
||||||
|
params.append(val)
|
||||||
|
|
||||||
|
if not clauses:
|
||||||
|
return "", []
|
||||||
|
return "WHERE " + " AND ".join(clauses), params
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DuckBaseModel:
|
||||||
|
file_glob: ClassVar[str] # 例: "./data_science/data/y=2025/news/news_*.csv"
|
||||||
|
provider: ClassVar[DuckDBProvider | None] = None
|
||||||
|
columns_select: ClassVar[str] = "*" # 例: "published_parsed, title, link"
|
||||||
|
default_order_by: ClassVar[Optional[str]] = None
|
||||||
|
|
||||||
|
# read_csv_auto オプション(必要ならサブクラスで上書き)
|
||||||
|
hive_partitioning: ClassVar[bool] = True
|
||||||
|
union_by_name: ClassVar[bool] = True
|
||||||
|
header: ClassVar[bool] = True
|
||||||
|
ignore_errors: ClassVar[bool] = True
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def set_provider(cls, provider: DuckDBProvider):
|
||||||
|
cls.provider = provider
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# ---- FROM 句(CSV読込) ----
|
||||||
|
@classmethod
|
||||||
|
def _from_clause(cls,file_glob=None) -> str:
|
||||||
|
path = file_glob or cls.file_glob
|
||||||
|
path = path.replace("'", "''")
|
||||||
|
path =f"{DATA_BASEPATH}{path}"
|
||||||
|
|
||||||
|
# path = cls.file_glob.replace("'", "''")
|
||||||
|
hp = 1 if cls.hive_partitioning else 0
|
||||||
|
un = 1 if cls.union_by_name else 0
|
||||||
|
hd = "TRUE" if cls.header else "FALSE"
|
||||||
|
ig = "TRUE" if cls.ignore_errors else "FALSE"
|
||||||
|
return (f"read_csv_auto('{path}', HEADER={hd}, IGNORE_ERRORS={ig}, "
|
||||||
|
f"hive_partitioning={hp}, union_by_name={un})")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def select_df(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: DuckDBProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: Optional[int] = None,
|
||||||
|
file_glob=None,
|
||||||
|
) -> List[T]:
|
||||||
|
where_sql, params = _build_where_clause(where)
|
||||||
|
order = order_by or cls.default_order_by
|
||||||
|
order_sql = f"ORDER BY {order}" if order else ""
|
||||||
|
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
|
||||||
|
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
|
||||||
|
sql = f"""
|
||||||
|
SELECT {cls.columns_select}
|
||||||
|
FROM {cls._from_clause(file_glob=file_glob)}
|
||||||
|
{where_sql}
|
||||||
|
{order_sql}
|
||||||
|
{limit_sql}
|
||||||
|
{offset_sql}
|
||||||
|
"""
|
||||||
|
# パラメータバインド:DuckDBProvider の生 connection を使用
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
|
||||||
|
cur = provider.con.execute(sql, params)
|
||||||
|
return cur.df()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def list(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: DuckDBProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
offset: Optional[int] = None,
|
||||||
|
file_glob=None,
|
||||||
|
) -> List[T]:
|
||||||
|
df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset,file_glob=file_glob)
|
||||||
|
if not is_dataclass(cls):
|
||||||
|
raise TypeError(f"{cls.__name__} is not a dataclass.")
|
||||||
|
instances: List[T] = []
|
||||||
|
for _, row in df.iterrows():
|
||||||
|
data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
|
||||||
|
instance = cls(**data) # type: ignore
|
||||||
|
instances.append(instance)
|
||||||
|
return instances
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def first(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: DuckDBProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
order_by: Optional[str] = None,
|
||||||
|
) -> Optional[T]:
|
||||||
|
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
|
||||||
|
return results[0] if results else None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def count(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: DuckDBProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
) -> int:
|
||||||
|
where_sql, params = _build_where_clause(where)
|
||||||
|
sql = f"""
|
||||||
|
SELECT COUNT(*) AS cnt
|
||||||
|
FROM {cls._from_clause()}
|
||||||
|
{where_sql}
|
||||||
|
"""
|
||||||
|
# パラメータバインド:DuckDBProvider の生 connection を使用
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
|
||||||
|
cur = provider.con.execute(sql, params)
|
||||||
|
row = cur.fetchone()
|
||||||
|
return row["cnt"] if row else 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def exists(
|
||||||
|
cls: Type[T],
|
||||||
|
provider: DuckDBProvider = None,
|
||||||
|
*,
|
||||||
|
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
||||||
|
) -> bool:
|
||||||
|
if provider is None:
|
||||||
|
provider = cls.provider
|
||||||
|
return cls.count(provider, where=where) > 0
|
||||||
@ -45,7 +45,7 @@ class FirestoreBaseModel:
|
|||||||
return cls._col().document(doc_id)
|
return cls._col().document(doc_id)
|
||||||
|
|
||||||
def _as_mutable_dict(self) -> Dict[str, Any]:
|
def _as_mutable_dict(self) -> Dict[str, Any]:
|
||||||
"""dataclass → dict(id と None は必要に応じて処理)"""
|
"""dataclass → dict(id と None は必要に応じて処理)"""
|
||||||
data = asdict(self)
|
data = asdict(self)
|
||||||
data.pop("_doc_id", None)
|
data.pop("_doc_id", None)
|
||||||
data.pop("collection_name", None)
|
data.pop("collection_name", None)
|
||||||
@ -54,7 +54,7 @@ class FirestoreBaseModel:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
|
def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
|
||||||
"""dict -> モデル(data内の'id'を拾う)"""
|
"""dict -> モデル(data内の'id'を拾う)"""
|
||||||
data = dict(data)
|
data = dict(data)
|
||||||
_id = data.pop("id", None)
|
_id = data.pop("id", None)
|
||||||
obj: T = cls(**data) # type: ignore[arg-type]
|
obj: T = cls(**data) # type: ignore[arg-type]
|
||||||
@ -88,7 +88,7 @@ class FirestoreBaseModel:
|
|||||||
return self._doc_id
|
return self._doc_id
|
||||||
|
|
||||||
def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None:
|
def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None:
|
||||||
"""部分更新(set(merge=True)の糖衣)"""
|
"""部分更新(set(merge=True)の糖衣)"""
|
||||||
if not self._doc_id:
|
if not self._doc_id:
|
||||||
raise ValueError("Cannot update without id.")
|
raise ValueError("Cannot update without id.")
|
||||||
payload = dict(changes)
|
payload = dict(changes)
|
||||||
@ -205,7 +205,7 @@ class FirestoreBaseModel:
|
|||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
条件でヒットしたドキュメントを一括削除。削除件数を返す。
|
条件でヒットしたドキュメントを一括削除。削除件数を返す。
|
||||||
※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う)
|
※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う)
|
||||||
"""
|
"""
|
||||||
from providers.firestore_provider import FireStoreProvider
|
from providers.firestore_provider import FireStoreProvider
|
||||||
rows = FireStoreProvider.list_documents(
|
rows = FireStoreProvider.list_documents(
|
||||||
|
|||||||
73
src/providers/duck_db_provider.py
Normal file
73
src/providers/duck_db_provider.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
import duckdb
|
||||||
|
|
||||||
|
|
||||||
|
class DuckDBProvider:
|
||||||
|
|
||||||
|
def __init__(self, db_path: str = ":memory:", read_only: bool = False):
|
||||||
|
self.con = self.connect(db_path, read_only)
|
||||||
|
|
||||||
|
def connect(self, db_path: str = ":memory:", read_only: bool = False):
|
||||||
|
return duckdb.connect(database=db_path, read_only=read_only)
|
||||||
|
|
||||||
|
def setup_gcs(self, access_key: str,secret_key: str):
|
||||||
|
"""GCSのシークレットを設定する"""
|
||||||
|
if not self.con:
|
||||||
|
raise ValueError("DuckDB is not connected.")
|
||||||
|
self.con.sql(f"""
|
||||||
|
CREATE OR REPLACE SECRET gcs_creds (
|
||||||
|
TYPE gcs,
|
||||||
|
KEY_ID '{access_key}',
|
||||||
|
SECRET '{secret_key}'
|
||||||
|
);
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""接続を閉じる"""
|
||||||
|
if self.con:
|
||||||
|
self.con.close()
|
||||||
|
|
||||||
|
def query_df(self, sql: str):
|
||||||
|
"""SQLクエリを実行してDataFrameで返す"""
|
||||||
|
return self.con.execute(sql).df()
|
||||||
|
|
||||||
|
def max_value(
|
||||||
|
self,
|
||||||
|
file_path: str,
|
||||||
|
column: str,
|
||||||
|
hive_partitioning: bool = True,
|
||||||
|
union_by_name: bool = True,
|
||||||
|
) -> any:
|
||||||
|
"""CSVファイルの指定列の最大値を取得する"""
|
||||||
|
query = f"""
|
||||||
|
SELECT MAX({column}) AS max_{column}
|
||||||
|
FROM read_csv_auto('{file_path}',
|
||||||
|
hive_partitioning={1 if hive_partitioning else 0},
|
||||||
|
union_by_name={1 if union_by_name else 0}
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
result = self.con.execute(query).fetchone()[0]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def load_file(self, file_glob: str, table: str):
|
||||||
|
"""CSVを読み込みテーブル化"""
|
||||||
|
sql = f"""
|
||||||
|
CREATE OR REPLACE TABLE {table} AS
|
||||||
|
SELECT *
|
||||||
|
FROM read_csv_auto('{file_glob}', HEADER=TRUE, IGNORE_ERRORS=TRUE)
|
||||||
|
"""
|
||||||
|
self.con.execute(sql)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_gs_csv_name(
|
||||||
|
backet: str,
|
||||||
|
object_name: str,
|
||||||
|
hive_partitioning: bool = True,
|
||||||
|
union_by_name: bool = True,
|
||||||
|
) -> str:
|
||||||
|
|
||||||
|
return f"""read_csv_auto('gs://{backet}/{object_name}',
|
||||||
|
hive_partitioning={1 if hive_partitioning else 0},
|
||||||
|
union_by_name={1 if union_by_name else 0}
|
||||||
|
)
|
||||||
|
"""
|
||||||
@ -78,12 +78,12 @@ class FireStoreProvider():
|
|||||||
start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None,
|
start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None,
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応)
|
コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
collection_name: コレクション名
|
collection_name: コレクション名
|
||||||
filters: [(field, op, value), ...]
|
filters: [(field, op, value), ...]
|
||||||
order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順)
|
order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順)
|
||||||
limit: 取得件数の上限
|
limit: 取得件数の上限
|
||||||
start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書
|
start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書
|
||||||
|
|
||||||
@ -189,13 +189,13 @@ class FireStoreProvider():
|
|||||||
merge: bool = True,
|
merge: bool = True,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
ドキュメントを更新。merge=True なら部分更新(推奨)。
|
ドキュメントを更新。merge=True なら部分更新(推奨)。
|
||||||
"""
|
"""
|
||||||
ref = cls.get_doc_ref(collection_name, doc_id)
|
ref = cls.get_doc_ref(collection_name, doc_id)
|
||||||
if merge:
|
if merge:
|
||||||
ref.set(data, merge=True)
|
ref.set(data, merge=True)
|
||||||
else:
|
else:
|
||||||
# 全置換(存在しないと作成される)
|
# 全置換(存在しないと作成される)
|
||||||
ref.set(data, merge=False)
|
ref.set(data, merge=False)
|
||||||
logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})")
|
logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})")
|
||||||
|
|
||||||
|
|||||||
662
src/providers/google_cloud_bigquery_provider.py
Normal file
662
src/providers/google_cloud_bigquery_provider.py
Normal file
@ -0,0 +1,662 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, Dict, Union,Sequence
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from google.cloud import bigquery
|
||||||
|
from google.cloud.bigquery.table import RowIterator, TableListItem
|
||||||
|
from google.cloud.bigquery.dataset import DatasetListItem
|
||||||
|
from google.cloud.bigquery.table import StreamingBuffer
|
||||||
|
|
||||||
|
from google.oauth2 import service_account
|
||||||
|
|
||||||
|
from lib.custom_logger import get_logger
|
||||||
|
logger = get_logger()
|
||||||
|
|
||||||
|
class BigQueryFieldType(str,Enum):
|
||||||
|
STRING = "STRING"
|
||||||
|
BYTES = "BYTES"
|
||||||
|
INTEGER = "INTEGER"
|
||||||
|
FLOAT = "FLOAT"
|
||||||
|
BOOLEAN = "BOOLEAN"
|
||||||
|
TIMESTAMP = "TIMESTAMP"
|
||||||
|
DATE = "DATE"
|
||||||
|
TIME = "TIME"
|
||||||
|
DATETIME = "DATETIME"
|
||||||
|
RECORD = "RECORD"
|
||||||
|
NUMERIC = "NUMERIC"
|
||||||
|
BIGNUMERIC = "BIGNUMERIC"
|
||||||
|
GEOGRAPHY = "GEOGRAPHY"
|
||||||
|
JSON = "JSON"
|
||||||
|
|
||||||
|
class BogQueryFieldMode(str,Enum):
|
||||||
|
NULLABLE = "NULLABLE" # NULL許可
|
||||||
|
REQUIRED = "REQUIRED" # 必須
|
||||||
|
REPEATED = "REPEATED" # 配列
|
||||||
|
|
||||||
|
|
||||||
|
class GoogleCloudBigQueryProvider:
|
||||||
|
"""Cloud BigQuery操作プロバイダ
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- このクラスはGoogle Cloud BigQueryの基本的な操作を提供します。
|
||||||
|
- 認証にはサービスアカウントキーまたはApplication Default Credentials(ADC)を使用します。
|
||||||
|
- BigQuery Storage Read APIを使用するなら別のライブラリを使用してください(高速読み込み)
|
||||||
|
- 必要な権限
|
||||||
|
- データセット操作を行う場合はBigQuery データオーナーの権限が必要です。
|
||||||
|
- bigquery.dataViewer / bigquery.dataEditor / bigquery.readSessionUserなど部分でも可能
|
||||||
|
- JOB操作を行う場合はBigQuery ジョブユーザーの権限が必要です。
|
||||||
|
- フル機能を使う場合はBigQuery データオーナー及びBigQuery ジョブユーザーの権限が必要です。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None):
|
||||||
|
try:
|
||||||
|
if cred_path:
|
||||||
|
creds = service_account.Credentials.from_service_account_file(cred_path)
|
||||||
|
# プロジェクト未指定なら credentials から取得
|
||||||
|
effective_project = project or creds.project_id
|
||||||
|
self._client = bigquery.Client(
|
||||||
|
project=effective_project, credentials=creds
|
||||||
|
)
|
||||||
|
logger.info(f"Bigquery client initialized with service account file. project={effective_project}")
|
||||||
|
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"):
|
||||||
|
cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
|
||||||
|
creds = service_account.Credentials.from_service_account_info(cred_json)
|
||||||
|
effective_project = project or creds.project_id
|
||||||
|
self._client = bigquery.Client(
|
||||||
|
project=effective_project, credentials=creds
|
||||||
|
)
|
||||||
|
logger.info("Bigquery client initialized with credentials from environment variable.")
|
||||||
|
else:
|
||||||
|
self._client = bigquery.Client(project=project)
|
||||||
|
logger.info("Bigquery client initialized with default credentials (ADC).")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Bigquery initialization failed: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_datasets(self)-> list[DatasetListItem]:
|
||||||
|
"""データセット一覧を取得する"""
|
||||||
|
try:
|
||||||
|
datasets = list(self._client.list_datasets())
|
||||||
|
return datasets
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to list datasets: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def create_dataset(
|
||||||
|
self, dataset_id: str,
|
||||||
|
location: Optional[str] ="asia-northeast1") -> bigquery.Dataset:
|
||||||
|
"""
|
||||||
|
データセットを作成する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dataset_id (str): 作成するデータセットID
|
||||||
|
location (Optional[str]): データセットのロケーション
|
||||||
|
Returns:
|
||||||
|
bigquery.Dataset: 作成されたデータセットオブジェクト
|
||||||
|
Notes:
|
||||||
|
- 既に存在するデータセットIDを指定した場合、例外が発生します。
|
||||||
|
- 必要な権限
|
||||||
|
- bigquery.datasets.create
|
||||||
|
- 命名規則
|
||||||
|
- 英小文字 (a-z)、数字 (0-9)、およびアンダースコア (_) のみ使用可能
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
dataset_ref = self._client.dataset(dataset_id)
|
||||||
|
dataset = bigquery.Dataset(dataset_ref)
|
||||||
|
if location:
|
||||||
|
dataset.location = location
|
||||||
|
dataset = self._client.create_dataset(dataset)
|
||||||
|
logger.info(f"Dataset {dataset_id} created.")
|
||||||
|
return dataset
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create dataset {dataset_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def is_exists_dataset(self, dataset_id: str) -> bool:
|
||||||
|
"""データセットが存在するか確認する"""
|
||||||
|
try:
|
||||||
|
self._client.get_dataset(dataset_id)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def delete_dataset(self, dataset_id: str, delete_contents: bool = False):
|
||||||
|
"""
|
||||||
|
データセットを削除する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dataset_id (str): 削除するデータセットID
|
||||||
|
delete_contents (bool): データセット内のテーブルも削除する場合はTrue
|
||||||
|
Notes:
|
||||||
|
- 必要な権限
|
||||||
|
- bigquery.datasets.delete
|
||||||
|
- delete_contents=Falseの場合、データセット内にテーブルが存在すると削除に失敗します。
|
||||||
|
- その場合はdelete_contents=Trueを指定してください。
|
||||||
|
- `google.api_core.exceptions.Conflict`が発生します。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self._client.delete_dataset(dataset_id, delete_contents=delete_contents)
|
||||||
|
logger.info(f"Dataset {dataset_id} deleted.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete dataset {dataset_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def full_table_id(self, dataset_id: str, table_name: str) -> str:
|
||||||
|
"""テーブルIDを構築する"""
|
||||||
|
project = self._client.project
|
||||||
|
return f"{project}.{dataset_id}.{table_name}"
|
||||||
|
|
||||||
|
def is_exists_table(self,table_id: str) -> bool:
|
||||||
|
"""テーブルが存在するか確認する"""
|
||||||
|
try:
|
||||||
|
self._client.get_table(table_id)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def addSchemaField(
|
||||||
|
name:str,
|
||||||
|
field_type:Union[str, BigQueryFieldType],
|
||||||
|
field_mode:Union[str, BogQueryFieldMode]=BogQueryFieldMode.NULLABLE ,
|
||||||
|
default_value_expression:Optional[str]=None,
|
||||||
|
description:Optional[str]=None
|
||||||
|
)-> bigquery.SchemaField:
|
||||||
|
if isinstance(field_type, BigQueryFieldType):
|
||||||
|
field_type = field_type.value
|
||||||
|
elif isinstance(field_type, str):
|
||||||
|
field_type = field_type.upper()
|
||||||
|
|
||||||
|
if isinstance(field_mode, BogQueryFieldMode):
|
||||||
|
field_mode = field_mode.value
|
||||||
|
elif isinstance(field_mode, str):
|
||||||
|
field_mode = field_mode.upper()
|
||||||
|
|
||||||
|
return bigquery.SchemaField(
|
||||||
|
name=name,
|
||||||
|
field_type=field_type,
|
||||||
|
mode=field_mode ,
|
||||||
|
default_value_expression=default_value_expression,
|
||||||
|
description=description)
|
||||||
|
|
||||||
|
def create_table(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
schema: list[bigquery.SchemaField]) -> bigquery.Table:
|
||||||
|
"""
|
||||||
|
テーブルを作成する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 作成するテーブルID(例: 'dataset.table')
|
||||||
|
schema (list[bigquery.SchemaField]): テーブルのスキーマ定義
|
||||||
|
Returns:
|
||||||
|
bigquery.Table: 作成されたテーブルオブジェクト
|
||||||
|
Notes:
|
||||||
|
- 既に存在するテーブルIDを指定した場合、例外が発生します。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
table = bigquery.Table(table_id, schema=schema)
|
||||||
|
table = self._client.create_table(table)
|
||||||
|
logger.info(f"Table {table_id} created.")
|
||||||
|
return table
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def delete_table(self, table_id: str, not_found_ok: bool = False):
|
||||||
|
"""
|
||||||
|
テーブルを削除する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 削除するテーブルID(例: 'dataset.table')
|
||||||
|
not_found_ok (bool): テーブルが存在しない場合でもエラーにしない場合はTrue
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self._client.delete_table(table_id, not_found_ok=not_found_ok)
|
||||||
|
logger.info(f"Table {table_id} deleted.")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_tables(self, dataset_id: str) -> list[TableListItem]:
|
||||||
|
"""
|
||||||
|
データセット内のテーブル一覧を取得する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dataset_id (str): 対象のデータセットID
|
||||||
|
Returns:
|
||||||
|
list[bigquery.TableListItem]: テーブル一覧
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
tables = list(self._client.list_tables(dataset_id))
|
||||||
|
logger.info(f"Tables listed successfully from dataset {dataset_id}, count {len(tables)}.")
|
||||||
|
return tables
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to list tables in dataset {dataset_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def insert_rows(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
rows: list[Dict[str, any]]) -> Sequence[dict]:
|
||||||
|
"""
|
||||||
|
テーブルに行を挿入する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 挿入先のテーブルID(例: 'dataset.table')
|
||||||
|
rows (list[Dict[str, any]]): 挿入する行データのリスト
|
||||||
|
Returns:
|
||||||
|
Sequence[dict]: 挿入結果のレスポンスオブジェクト
|
||||||
|
Notes:
|
||||||
|
- rowsは辞書形式で指定します。キーがカラム名、値がカラムの値となります。
|
||||||
|
- 例: [{"column1": "value1", "column2": 123}, {"column1": "value2", "column2": 456}]
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
errors = self._client.insert_rows_json(table_id, rows)
|
||||||
|
if errors:
|
||||||
|
logger.error(f"Errors occurred while inserting rows into {table_id}: {errors}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Rows inserted successfully into {table_id}.")
|
||||||
|
return errors
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to insert rows into table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def list_rows(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
selected_fields:Optional[list[str]] = None,
|
||||||
|
max_results:Optional[int] = None,
|
||||||
|
start_index:Optional[int] = None
|
||||||
|
) -> RowIterator:
|
||||||
|
"""
|
||||||
|
テーブルの行を一覧取得する
|
||||||
|
Args:
|
||||||
|
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
||||||
|
selected_fields (Optional[list[str]]): 取得するカラム名のリスト。Noneの場合は全カラム取得
|
||||||
|
max_results (Optional[int]): 取得する行数の上限。Noneの場合は制限なし
|
||||||
|
start_index (Optional[int]): 取得開始インデックス。Noneの場合は先頭から取得
|
||||||
|
Returns:
|
||||||
|
RowIterator: 取得した行のイテレータ
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
table = self._client.get_table(table_id)
|
||||||
|
_selected_fields:Optional[list[bigquery.SchemaField]] = None
|
||||||
|
if selected_fields:
|
||||||
|
_selected_fields = [field for field in table.schema if field.name in selected_fields]
|
||||||
|
|
||||||
|
rows = self._client.list_rows(
|
||||||
|
table,
|
||||||
|
selected_fields=_selected_fields,
|
||||||
|
max_results=max_results,
|
||||||
|
start_index=start_index
|
||||||
|
)
|
||||||
|
logger.info(f"Rows listed successfully from table {table_id}.")
|
||||||
|
return rows
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to list rows from table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def list_rows_to_dataframe(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
selected_fields:Optional[list[str]] = None,
|
||||||
|
max_results:Optional[int] = None,
|
||||||
|
start_index:Optional[int] = None
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
table = self._client.get_table(table_id)
|
||||||
|
df = self._client.list_rows(
|
||||||
|
table,
|
||||||
|
selected_fields=selected_fields,
|
||||||
|
max_results=max_results,
|
||||||
|
start_index=start_index
|
||||||
|
).to_dataframe()
|
||||||
|
logger.info(f"Rows listed to DataFrame successfully from table {table_id}.")
|
||||||
|
return df
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to list rows to DataFrame from table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# --------------------------------------------------------
|
||||||
|
def get_streaming_buffer_info(self,table_id: str):
|
||||||
|
"""スリーミングバッファを取得する
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
- スリーミングバッファは、ストリーミング挿入によってテーブルに追加されたデータが永続的なストレージに書き込まれる前に一時的に保存される場所です。
|
||||||
|
- スリーミングバッファの情報が存在しない場合、Noneを返します
|
||||||
|
- ストリーミングバッファ内のデータを直接削除・更新・クリアすることは不可能です。
|
||||||
|
- GCP側で完全に自動制御されており、手動でクリア/フラッシュ/削除する手段は 存在しません。
|
||||||
|
- 通常:数秒〜数分 高頻度ストリーミング:〜30分 極端な負荷/多パーティション:最大約90分(GCP上限目安)
|
||||||
|
"""
|
||||||
|
t = self._client.get_table(table_id)
|
||||||
|
# SDK属性 or 下位プロパティのどちらかに入っています
|
||||||
|
buf:StreamingBuffer = getattr(t, "streaming_buffer", None) or t._properties.get("streamingBuffer")
|
||||||
|
if not buf:
|
||||||
|
return None
|
||||||
|
# 例: {'estimatedRows': '123', 'estimatedBytes': '456789', 'oldestEntryTime': '1698234000000'}
|
||||||
|
logger.debug(f"Streaming Buffer Raw Info: {buf}")
|
||||||
|
return {
|
||||||
|
"estimated_rows": buf.estimated_rows,
|
||||||
|
"estimated_bytes": buf.estimated_bytes,
|
||||||
|
"oldest_entry_ms": buf.oldest_entry_time,
|
||||||
|
"raw": buf,
|
||||||
|
}
|
||||||
|
|
||||||
|
def excute_query(
|
||||||
|
self,
|
||||||
|
query: str,
|
||||||
|
params:list = None) -> bigquery.QueryJob:
|
||||||
|
"""
|
||||||
|
クエリを実行する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query (str): 実行するSQLクエリ
|
||||||
|
Returns:
|
||||||
|
bigquery.QueryJob: 実行結果のQueryJobオブジェクト
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if params and len(params) >0:
|
||||||
|
query_job = self._client.query(query, job_config=bigquery.QueryJobConfig(
|
||||||
|
query_parameters=params
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
query_job = self._client.query(query)
|
||||||
|
logger.info("Query executed successfully.")
|
||||||
|
logger.debug(f"Query JOB: {query_job}")
|
||||||
|
return query_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute query: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def addQueryParameter(
|
||||||
|
name:str,
|
||||||
|
value: any,
|
||||||
|
param_type:Union[str, BigQueryFieldType]=BigQueryFieldType.STRING
|
||||||
|
) -> bigquery.ScalarQueryParameter:
|
||||||
|
"""クエリパラメータを作成する"""
|
||||||
|
if isinstance(param_type, BigQueryFieldType):
|
||||||
|
param_type = param_type.value
|
||||||
|
elif isinstance(param_type, str):
|
||||||
|
param_type = param_type.upper()
|
||||||
|
|
||||||
|
return bigquery.ScalarQueryParameter(
|
||||||
|
name=name,
|
||||||
|
type_=param_type,
|
||||||
|
value=value
|
||||||
|
)
|
||||||
|
|
||||||
|
def select_query(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
columns: list[str] = None,
|
||||||
|
were_clause: str = None,
|
||||||
|
params: list = None,
|
||||||
|
order_by: str = None,
|
||||||
|
limit: int = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
指定したテーブルからデータを選択するクエリを実行する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
||||||
|
columns (list[str]): 取得するカラム名のリスト。Noneの場合は全カラム取得
|
||||||
|
were_clause (str): WHERE句の条件式。Noneの場合は条件なし
|
||||||
|
order_by (str): 並び替えのカラム名。Noneの場合は並び替えなし
|
||||||
|
limit (int): 取得する行数の上限。Noneの場合は制限なし
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
cols = ", ".join(columns) if columns else "*"
|
||||||
|
query = f"SELECT {cols} FROM `{table_id}`"
|
||||||
|
if were_clause:
|
||||||
|
query += f" WHERE {were_clause}"
|
||||||
|
if order_by:
|
||||||
|
query += f" ORDER BY {order_by}"
|
||||||
|
if limit:
|
||||||
|
query += f" LIMIT {limit}"
|
||||||
|
query_job = self.excute_query(query, params=params)
|
||||||
|
logger.info("Select query executed successfully.")
|
||||||
|
|
||||||
|
return query_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute select query on table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def select_query_to_dataframe(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
columns: list[str] = None,
|
||||||
|
were_clause: str = None,
|
||||||
|
params: list = None,
|
||||||
|
order_by: str = None,
|
||||||
|
limit: int = None
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
query_job= self.select_query(
|
||||||
|
table_id=table_id,
|
||||||
|
columns=columns,
|
||||||
|
were_clause=were_clause,
|
||||||
|
params=params,
|
||||||
|
order_by=order_by,
|
||||||
|
limit=limit
|
||||||
|
)
|
||||||
|
df = query_job.to_dataframe()
|
||||||
|
logger.info("Select query to DataFrame executed successfully.")
|
||||||
|
return df
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to convert query result to DataFrame for table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def select_query_to_dict(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
columns: list[str] = None,
|
||||||
|
were_clause: str = None,
|
||||||
|
params: list = None,
|
||||||
|
order_by: str = None,
|
||||||
|
limit: int = None
|
||||||
|
) -> list[Dict[str, any]]:
|
||||||
|
try:
|
||||||
|
query_job= self.select_query(
|
||||||
|
table_id=table_id,
|
||||||
|
columns=columns,
|
||||||
|
were_clause=were_clause,
|
||||||
|
params=params,
|
||||||
|
order_by=order_by,
|
||||||
|
limit=limit
|
||||||
|
)
|
||||||
|
results = query_job.result()
|
||||||
|
dicts = [dict(row) for row in results]
|
||||||
|
logger.info("Select query to dicts executed successfully.")
|
||||||
|
return dicts
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to convert query result to dicts for table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def isert_query(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
values: dict[str, str],
|
||||||
|
params: list = None
|
||||||
|
) -> bigquery.QueryJob:
|
||||||
|
"""
|
||||||
|
挿入クエリ(INSERT)を実行する
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
query = f"INSERT INTO `{table_id}` ({', '.join(values.keys())})"
|
||||||
|
query += f" VALUES ({', '.join([str(v) for v in values.values()])})"
|
||||||
|
query_job = self.excute_query(query, params=params)
|
||||||
|
logger.info("Insert query executed successfully.")
|
||||||
|
return query_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute insert query on table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def update_query(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
values: dict[str, str],
|
||||||
|
were_clause: str,
|
||||||
|
params: list = None
|
||||||
|
) -> bigquery.QueryJob:
|
||||||
|
"""
|
||||||
|
更新クエリ(UPDATE)を実行する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
||||||
|
values (dict[str, str]): 更新するカラムと値の辞書
|
||||||
|
were_clause (str): WHERE句の条件式。全件の場合は"TRUE"を指定
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
query = f"UPDATE `{table_id}`"
|
||||||
|
query += f" SET {', '.join([f'{k} = {v}' for k, v in values.items()])}"
|
||||||
|
if were_clause:
|
||||||
|
query += f" WHERE {were_clause}"
|
||||||
|
logger.debug(f"Update Query: {query}")
|
||||||
|
query_job = self.excute_query(query, params=params)
|
||||||
|
logger.info("Update query executed successfully.")
|
||||||
|
return query_job.result()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute update query on table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def delete_query(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
were_clause:str,
|
||||||
|
params: list = None
|
||||||
|
) -> bigquery.QueryJob:
|
||||||
|
"""
|
||||||
|
削除クエリ(DELETE)を実行する
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
query = f"DELETE FROM `{table_id}`"
|
||||||
|
if were_clause:
|
||||||
|
query += f" WHERE {were_clause}"
|
||||||
|
query_job = self.excute_query(query, params=params)
|
||||||
|
logger.info("Delete query executed successfully.")
|
||||||
|
return query_job.result()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute delete query on table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def upsert_query(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
insert_values: dict[str, str],
|
||||||
|
update_values: dict[str, str],
|
||||||
|
key_columns: list[str],
|
||||||
|
params: list = None
|
||||||
|
) -> bigquery.QueryJob:
|
||||||
|
"""
|
||||||
|
UPSERTクエリ(MERGE)を実行する
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
query = f"MERGE INTO `{table_id}` T"
|
||||||
|
query += f" USING (SELECT {', '.join([str(v) for v in insert_values.values()])}) S"
|
||||||
|
query += f" ON {' AND '.join([f'T.{col} = S.{col}' for col in key_columns])}"
|
||||||
|
query += " WHEN MATCHED THEN"
|
||||||
|
query += f" UPDATE SET {', '.join([f'T.{k} = {v}' for k, v in update_values.items()])}"
|
||||||
|
query += " WHEN NOT MATCHED THEN"
|
||||||
|
query += f" INSERT ({', '.join(insert_values.keys())})"
|
||||||
|
query += f" VALUES ({', '.join([str(v) for v in insert_values.values()])})"
|
||||||
|
query_job = self.excute_query(query, params=params)
|
||||||
|
logger.info("Upsert query executed successfully.")
|
||||||
|
return query_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to execute upsert query on table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def import_csv(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
csv_file_path: str,
|
||||||
|
skip_leading_rows: int = 1,
|
||||||
|
write_disposition: str = "WRITE_APPEND",
|
||||||
|
field_delimiter: str = ",",
|
||||||
|
autodetect: bool = True,
|
||||||
|
schema: list[bigquery.SchemaField] = None
|
||||||
|
) -> bigquery.LoadJob:
|
||||||
|
"""
|
||||||
|
CSVファイルからテーブルにデータを挿入する
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): 挿入先のテーブルID(例: 'dataset.table')
|
||||||
|
csv_file_path (str): 挿入元のCSVファイルパス GCSも対応可能(gs://bucket/path/to/file.csv)
|
||||||
|
skip_leading_rows (int): ヘッダ行をスキップする行数
|
||||||
|
write_disposition (str): 書き込みモード (例: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY")
|
||||||
|
field_delimiter (str): フィールド区切り文字
|
||||||
|
autodetect (bool): スキーマ自動検出を有効にするかどうか
|
||||||
|
schema (list[bigquery.SchemaField]): スキーマ定義。autodetect=Falseの場合に必要
|
||||||
|
Returns:
|
||||||
|
bigquery.LoadJob: ロードジョブオブジェクト
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
job_config = bigquery.LoadJobConfig(
|
||||||
|
skip_leading_rows=skip_leading_rows,
|
||||||
|
write_disposition=write_disposition,
|
||||||
|
field_delimiter=field_delimiter,
|
||||||
|
autodetect=autodetect,
|
||||||
|
schema=schema
|
||||||
|
)
|
||||||
|
with open(csv_file_path, "rb") as source_file:
|
||||||
|
load_job = self._client.load_table_from_file(
|
||||||
|
source_file,
|
||||||
|
table_id,
|
||||||
|
job_config=job_config
|
||||||
|
)
|
||||||
|
load_job.result() # ジョブの完了を待機
|
||||||
|
logger.info(f"Data loaded successfully from {csv_file_path} to table {table_id}.")
|
||||||
|
return load_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to load data from {csv_file_path} to table {table_id}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def export_csv_to_gcs(
|
||||||
|
self,
|
||||||
|
table_id: str,
|
||||||
|
bucket: str,
|
||||||
|
prefix: str,
|
||||||
|
csv_file_name: str =None,
|
||||||
|
location: str = None
|
||||||
|
) -> bigquery.ExtractJob:
|
||||||
|
"""
|
||||||
|
テーブルからGCSのCSVファイルにデータをエクスポートする
|
||||||
|
|
||||||
|
Args:
|
||||||
|
table_id (str): エクスポート元のテーブルID(例: 'dataset.table')
|
||||||
|
bucket (str): エクスポート先のGCSバケット名
|
||||||
|
prefix (str): エクスポート先のGCSプレフィックス(フォルダパス)
|
||||||
|
csv_file_name (str): エクスポート先のCSVファイル名。Noneの場合は自動生成
|
||||||
|
location (str): ジョブのロケーション
|
||||||
|
Returns:
|
||||||
|
bigquery.ExtractJob: エクストラクトジョブオブジェクト
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not csv_file_name:
|
||||||
|
_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
csv_file_name = f"{table_id.replace('.','_')}_{_timestamp}.csv"
|
||||||
|
|
||||||
|
destination_uri = f"gs://{bucket}/{prefix}/{csv_file_name}"
|
||||||
|
|
||||||
|
extract_job = self._client.extract_table(
|
||||||
|
table_id,
|
||||||
|
destination_uri,
|
||||||
|
location=location
|
||||||
|
)
|
||||||
|
extract_job.result() # ジョブの完了を待機
|
||||||
|
logger.info(f"Data exported successfully from table {table_id} to {destination_uri}.")
|
||||||
|
return extract_job
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to export data from table {table_id} to {destination_uri}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
0
src/providers/google_cloud_pubsub_provider.py
Normal file
0
src/providers/google_cloud_pubsub_provider.py
Normal file
@ -10,6 +10,9 @@ from google.oauth2 import service_account
|
|||||||
from lib.custom_logger import get_logger
|
from lib.custom_logger import get_logger
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
|
import zipfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
class GoogleCloudStorageProvider:
|
class GoogleCloudStorageProvider:
|
||||||
|
|
||||||
|
|
||||||
@ -75,6 +78,17 @@ class GoogleCloudStorageProvider:
|
|||||||
|
|
||||||
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
|
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
|
||||||
content_type: str | None = None) -> Dict[str, Any]:
|
content_type: str | None = None) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
オブジェクトを書き込む
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bucket (str): バケット名
|
||||||
|
object_name (str): オブジェクト名
|
||||||
|
data (Union[bytes, BinaryIO, str]): 書き込むデータ
|
||||||
|
content_type (Optional[str]): コンテンツタイプ(MIMEタイプ)
|
||||||
|
Returns:
|
||||||
|
Dict[str, Any]: 書き込んだオブジェクトの情報
|
||||||
|
"""
|
||||||
blob = self._blob(bucket, object_name)
|
blob = self._blob(bucket, object_name)
|
||||||
if content_type is None:
|
if content_type is None:
|
||||||
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
|
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
|
||||||
@ -101,3 +115,56 @@ class GoogleCloudStorageProvider:
|
|||||||
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
|
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
|
||||||
expires: timedelta = timedelta(hours=1)) -> str:
|
expires: timedelta = timedelta(hours=1)) -> str:
|
||||||
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
|
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
|
||||||
|
|
||||||
|
def zip_items(
|
||||||
|
self,
|
||||||
|
bucket: str,
|
||||||
|
object_names: List[str],
|
||||||
|
) -> bytes:
|
||||||
|
"""
|
||||||
|
複数のGCSオブジェクトを1つのZIPにまとめ、ZIPバイナリ(bytes)を返す
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bucket (str): バケット名
|
||||||
|
object_names (List[str]): 対象オブジェクトのリスト
|
||||||
|
Returns:
|
||||||
|
bytes: ZIPファイルのバイナリ
|
||||||
|
"""
|
||||||
|
out = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||||||
|
for obj in object_names:
|
||||||
|
blob = self._blob(bucket, obj)
|
||||||
|
if not blob.exists():
|
||||||
|
raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}")
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
blob.download_to_file(buf)
|
||||||
|
buf.seek(0)
|
||||||
|
arcname = Path(obj).name
|
||||||
|
zf.writestr(arcname, buf.read())
|
||||||
|
|
||||||
|
zf.comment = f"bucket={bucket}, files={len(object_names)}".encode()
|
||||||
|
|
||||||
|
return out.getvalue()
|
||||||
|
|
||||||
|
def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""):
|
||||||
|
"""
|
||||||
|
ローカルフォルダをGCSに再帰的にアップロードする
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bucket (str): バケット名
|
||||||
|
folder_path (str): ローカルフォルダのパス
|
||||||
|
gcs_prefix (str): GCS上のプレフィックス(フォルダパス)
|
||||||
|
"""
|
||||||
|
_bucket = self._bucket(bucket)
|
||||||
|
|
||||||
|
for root, _, files in os.walk(folder_path):
|
||||||
|
for file in files:
|
||||||
|
local_file_path = os.path.join(root, file)
|
||||||
|
# フォルダ構造を保つように相対パスを生成
|
||||||
|
relative_path = os.path.relpath(local_file_path, folder_path)
|
||||||
|
gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/")
|
||||||
|
|
||||||
|
blob = _bucket.blob(gcs_object_name)
|
||||||
|
blob.upload_from_filename(local_file_path)
|
||||||
|
logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}")
|
||||||
@ -31,8 +31,8 @@ class OneDriveProvider:
|
|||||||
client_id: Azure Portal のアプリ(公開クライアント)の Client ID
|
client_id: Azure Portal のアプリ(公開クライアント)の Client ID
|
||||||
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
|
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
|
||||||
scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"]
|
scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"]
|
||||||
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意)
|
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意)
|
||||||
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意)
|
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意)
|
||||||
"""
|
"""
|
||||||
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
|
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
|
||||||
self.authority = authority or self.DEFAULT_AUTHORITY
|
self.authority = authority or self.DEFAULT_AUTHORITY
|
||||||
@ -64,7 +64,7 @@ class OneDriveProvider:
|
|||||||
f.write(self._token_cache.serialize())
|
f.write(self._token_cache.serialize())
|
||||||
|
|
||||||
def ensure_token(self):
|
def ensure_token(self):
|
||||||
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
|
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
|
||||||
if self._access_token:
|
if self._access_token:
|
||||||
return self._access_token
|
return self._access_token
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ class OneDriveProvider:
|
|||||||
"""
|
"""
|
||||||
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。
|
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。
|
||||||
先頭に / を付けず、URL エンコードは requests 側に任せる前提で
|
先頭に / を付けず、URL エンコードは requests 側に任せる前提で
|
||||||
空白や日本語は安全のため quote することを推奨(今回は簡易化)。
|
空白や日本語は安全のため quote することを推奨(今回は簡易化)。
|
||||||
"""
|
"""
|
||||||
if not path or path.strip() in ["/", "."]:
|
if not path or path.strip() in ["/", "."]:
|
||||||
return ""
|
return ""
|
||||||
@ -156,7 +156,7 @@ class OneDriveProvider:
|
|||||||
|
|
||||||
def create_folder(self, folder_path: str) -> Dict[str, Any]:
|
def create_folder(self, folder_path: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
中間フォルダも順次作成(簡易実装)。
|
中間フォルダも順次作成(簡易実装)。
|
||||||
"""
|
"""
|
||||||
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
|
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
|
||||||
cur = ""
|
cur = ""
|
||||||
@ -300,7 +300,7 @@ class OneDriveProvider:
|
|||||||
return r.json()
|
return r.json()
|
||||||
|
|
||||||
# -----------------------
|
# -----------------------
|
||||||
# 共有リンク(擬似 Signed URL)
|
# 共有リンク(擬似 Signed URL)
|
||||||
# -----------------------
|
# -----------------------
|
||||||
def generate_share_link(
|
def generate_share_link(
|
||||||
self,
|
self,
|
||||||
@ -316,7 +316,7 @@ class OneDriveProvider:
|
|||||||
url = f"{self._item_by_path_url(path)}:/createLink"
|
url = f"{self._item_by_path_url(path)}:/createLink"
|
||||||
body: Dict[str, Any] = {"type": link_type, "scope": scope}
|
body: Dict[str, Any] = {"type": link_type, "scope": scope}
|
||||||
if password:
|
if password:
|
||||||
body["password"] = password # パスワード保護(ポリシーにより可否)
|
body["password"] = password # パスワード保護(ポリシーにより可否)
|
||||||
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
|
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
data:dict = r.json()
|
data:dict = r.json()
|
||||||
|
|||||||
41
src/providers/rss_reader_client.py
Normal file
41
src/providers/rss_reader_client.py
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
import feedparser
|
||||||
|
from feedparser import FeedParserDict
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
class Feed(BaseModel):
|
||||||
|
url: str
|
||||||
|
title: str | None = None
|
||||||
|
company: str | None = None
|
||||||
|
description: str | None = None
|
||||||
|
language: str | None = None
|
||||||
|
tags: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class RSSItem(BaseModel):
|
||||||
|
title: str
|
||||||
|
link: str
|
||||||
|
guid: str | None = None
|
||||||
|
published: str | None = None
|
||||||
|
ts: str | None = None
|
||||||
|
source: str | None = None
|
||||||
|
description: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class RSSReaderClient:
|
||||||
|
"""RSSリーダークライアント"""
|
||||||
|
@classmethod
|
||||||
|
def fetch(cls, feed: Feed, from_date: str, to_date: str) -> list[RSSItem]:
|
||||||
|
"""指定されたフィードから記事を取得する"""
|
||||||
|
items = []
|
||||||
|
d: FeedParserDict = feedparser.parse(feed.url)
|
||||||
|
for e in d.entries:
|
||||||
|
items.append(RSSItem(
|
||||||
|
title=e.get("title", "(no title)"),
|
||||||
|
link=e.get("link"),
|
||||||
|
guid=e.get("id") or e.get("guid") or e.get("link"),
|
||||||
|
published=e.get("published") or e.get("updated"),
|
||||||
|
ts=e.get("published") or e.get("updated"),
|
||||||
|
source=feed.url,
|
||||||
|
description=e.get("summary") or e.get("description"),
|
||||||
|
))
|
||||||
|
return items
|
||||||
Loading…
x
Reference in New Issue
Block a user