Compare commits
No commits in common. "f74b1c7a1097f7d9141445617c0484ddb3ce861f" and "649015d0166c539baedb473a8871995952dd4ee8" have entirely different histories.
f74b1c7a10
...
649015d016
5
.gitignore
vendored
5
.gitignore
vendored
@ -3,7 +3,4 @@ venv/
|
|||||||
__pycache__
|
__pycache__
|
||||||
|
|
||||||
.env
|
.env
|
||||||
keys/
|
keys/
|
||||||
|
|
||||||
|
|
||||||
.onedrive_cache.json
|
|
||||||
15
README.md
15
README.md
@ -3,17 +3,4 @@
|
|||||||
```sh
|
```sh
|
||||||
python -m venv venv
|
python -m venv venv
|
||||||
.\venv\Scripts\activate
|
.\venv\Scripts\activate
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
curl https://api.openai.com/v1/audio/transcriptions \
|
|
||||||
-H "Authorization: Bearer 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" \
|
|
||||||
-F "file=@part_001.m4a" \
|
|
||||||
-F "model=gpt-4o-transcribe" \
|
|
||||||
-F "response_format=text" \
|
|
||||||
-F "language=ja" \
|
|
||||||
-o part_001.txt
|
|
||||||
|
|
||||||
ffmpeg -i voice.m4a -f segment -segment_time 600 -c copy part_%03d.m4a
|
|
||||||
|
|
||||||
# フォーマットを揃える
|
|
||||||
@ -1,12 +0,0 @@
|
|||||||
## Big Queryは「データウェアハウス型」
|
|
||||||
|
|
||||||
Big QueryはOLAP(分析)用途の列指向データベースなので、
|
|
||||||
RDB(MySQL,PostgreSQLなど)とは違い、行の一意性や整合性制約を持たない設計です。
|
|
||||||
|
|
||||||
## BigQueryに存在しない制約
|
|
||||||
|
|
||||||
* PRIMARY KEY
|
|
||||||
* FOREIGN KEY
|
|
||||||
* UNIQUE
|
|
||||||
* CHECK
|
|
||||||
* AUTO_INCREMENT
|
|
||||||
@ -1,66 +0,0 @@
|
|||||||
|
|
||||||
|
|
||||||
```powershell
|
|
||||||
# 一時ファイルの削除
|
|
||||||
Remove-Item -Path "$env:TEMP\*" -Recurse -Force -ErrorAction SilentlyContinue
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
移動できるデータを他ドライブへ
|
|
||||||
|
|
||||||
|
|
||||||
```powershell
|
|
||||||
Get-ChildItem C:\ -Recurse -ErrorAction SilentlyContinue |
|
|
||||||
Where-Object { -not $_.PSIsContainer } |
|
|
||||||
Sort-Object Length -Descending |
|
|
||||||
Select-Object FullName, @{Name="Size(GB)";Expression={"{0:N2}" -f ($_.Length / 1GB)}} -First 20
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
サードパーティツールで整理
|
|
||||||
|
|
||||||
WinDirStat
|
|
||||||
→ ディスクの使用状況を可視化して、大きいファイルを一目で把握。
|
|
||||||
|
|
||||||
```
|
|
||||||
winget install WinDirStat
|
|
||||||
windirstat
|
|
||||||
```
|
|
||||||
|
|
||||||
CCleaner
|
|
||||||
→ 一時ファイルや不要レジストリの掃除に便利。
|
|
||||||
|
|
||||||
|
|
||||||
```
|
|
||||||
Remove-Item "C:\adobeTemp\*" -Recurse -Force -ErrorAction SilentlyContinue
|
|
||||||
|
|
||||||
|
|
||||||
ren "C:\adobeTemp" "adobeTemp_old"
|
|
||||||
```
|
|
||||||
|
|
||||||
attrib -h -s -r "C:\adobeTemp"
|
|
||||||
|
|
||||||
ren C:\adobeTemp C:\adobeTemp_old
|
|
||||||
ren "C:\adobeTemp" "adobeTemp_old"
|
|
||||||
|
|
||||||
mkdir "F:\adobeTemp"
|
|
||||||
|
|
||||||
mklink /J C:\adobeTemp F:\adobeTemp
|
|
||||||
|
|
||||||
|
|
||||||
ren C:\adobeTemp C:\adobeTemp_old
|
|
||||||
リンク先のフォルダを作成(Dドライブに新しい保存場所を用意)
|
|
||||||
|
|
||||||
powershell
|
|
||||||
Copy code
|
|
||||||
mkdir D:\adobeTemp
|
|
||||||
ジャンクションを作成
|
|
||||||
管理者権限のコマンドプロンプトで:
|
|
||||||
|
|
||||||
cmd
|
|
||||||
Copy code
|
|
||||||
mklink /J C:\adobeTemp D:\adobeTemp
|
|
||||||
|
|
||||||
|
|
||||||
cd "C:\Users\r_yam\AppData\Local\Docker\wsl\data"
|
|
||||||
optimize-vhd -Path .\ext4.vhdx -Mode Full
|
|
||||||
@ -1,110 +0,0 @@
|
|||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger(level=10)
|
|
||||||
|
|
||||||
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
|
||||||
|
|
||||||
|
|
||||||
def example_bigquery():
|
|
||||||
try:
|
|
||||||
# GoogleCloudBigQueryProviderのインスタンスを作成
|
|
||||||
provider = GoogleCloudBigQueryProvider(
|
|
||||||
cred_path="keys/google_service_accout.json",
|
|
||||||
)
|
|
||||||
|
|
||||||
dss = provider.get_datasets()
|
|
||||||
for ds in dss:
|
|
||||||
logger.info(f"Dataset ID: {ds.dataset_id}")
|
|
||||||
|
|
||||||
# データセットを作成する
|
|
||||||
dataset_id = "example_dataset"
|
|
||||||
if provider.is_exists_dataset(dataset_id):
|
|
||||||
logger.info(f"Dataset {dataset_id} already exists.")
|
|
||||||
else:
|
|
||||||
dataset = provider.create_dataset(dataset_id)
|
|
||||||
logger.info(f"Dataset {dataset_id} created at {dataset.created}.")
|
|
||||||
|
|
||||||
table_id = provider.full_table_id(dataset_id, "example_table")
|
|
||||||
# テーブルを作成する
|
|
||||||
if provider.is_exists_table(table_id):
|
|
||||||
logger.info(f"Table {table_id} already exists.")
|
|
||||||
else:
|
|
||||||
logger.info(f"Creating table {table_id}...")
|
|
||||||
schema = [
|
|
||||||
provider.addSchemaField("device_code", "string", "REQUIRED", description="Device code"),
|
|
||||||
provider.addSchemaField("time_stamp", "timestamp", "REQUIRED", description="Timestamp"),
|
|
||||||
]
|
|
||||||
table = provider.create_table(
|
|
||||||
table_id=table_id,
|
|
||||||
schema=schema
|
|
||||||
)
|
|
||||||
|
|
||||||
# tables = provider.get_tables(dataset_id)
|
|
||||||
# for table in tables:
|
|
||||||
# logger.info(f"Table ID: {table.table_id}")
|
|
||||||
|
|
||||||
# テーブル情報を挿入する
|
|
||||||
# provider.insert_rows(
|
|
||||||
# table_id=table_id,
|
|
||||||
# rows=[
|
|
||||||
# {"device_code": "device_001", "time_stamp": "2025-01-01 12:00:00"},
|
|
||||||
# {"device_code": "device_002", "time_stamp": "2025-01-01 12:05:00"},
|
|
||||||
# {"device_code": "device_003", "time_stamp": "2025-01-01 12:10:00"},
|
|
||||||
# ]
|
|
||||||
# )
|
|
||||||
# テーブル情報を確認する(JOBあり・SQL発行)
|
|
||||||
# job_qyuery = provider.excute_query(
|
|
||||||
# query=f"SELECT * FROM `{table_id}` where device_code='device_001'",
|
|
||||||
# )
|
|
||||||
# results = job_qyuery.result()
|
|
||||||
# for row in results:
|
|
||||||
# logger.info(f"Row: {row}")
|
|
||||||
|
|
||||||
|
|
||||||
# テーブル情報を確認する(JOBあり)
|
|
||||||
# rows = provider.list_rows(
|
|
||||||
# table_id=table_id,
|
|
||||||
# )
|
|
||||||
# for row in rows:
|
|
||||||
# logger.info(f"Row: {row}")
|
|
||||||
|
|
||||||
# バッファ状況の確認
|
|
||||||
# buffer_status = provider.get_streaming_buffer_info(table_id=table_id)
|
|
||||||
# logger.info(f"Streaming Buffer Info: {buffer_status}")
|
|
||||||
# テーブルを更新する
|
|
||||||
# query_job = provider.update_query(
|
|
||||||
# table_id=table_id,
|
|
||||||
# values={
|
|
||||||
# "device_code": "'device_999'"
|
|
||||||
# },
|
|
||||||
# were_clause="device_code='device_002'"
|
|
||||||
# )
|
|
||||||
# query_job.result() # ジョブの完了を待機
|
|
||||||
|
|
||||||
|
|
||||||
# # テーブル情報を確認する(JOBあり)
|
|
||||||
# job = provider.select_query(
|
|
||||||
# table_id=table_id,
|
|
||||||
# )
|
|
||||||
# results = job.result()
|
|
||||||
# for row in results:
|
|
||||||
# logger.info(f"Row: {row}")
|
|
||||||
|
|
||||||
# # テーブルのレコードを削除する
|
|
||||||
# provider.delete_query(
|
|
||||||
# table_id=table_id,
|
|
||||||
# were_clause="device_code='device_012'"
|
|
||||||
# )
|
|
||||||
# テーブルを削除する
|
|
||||||
# provider.delete_table(table_id=table_id)
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in example_bigquery: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
example_bigquery()
|
|
||||||
@ -1,74 +0,0 @@
|
|||||||
import sys
|
|
||||||
import os
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
|
||||||
from typing import ClassVar, Optional
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger(level=10)
|
|
||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
from models.bigquery_base_model import BigQueryBaseModel
|
|
||||||
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ExamplModel(BigQueryBaseModel):
|
|
||||||
device_code: str
|
|
||||||
time_stamp: datetime
|
|
||||||
|
|
||||||
table_id: ClassVar[str] = "example_dataset.example_table"
|
|
||||||
key_field: ClassVar[Optional[str]] = "device_code"
|
|
||||||
|
|
||||||
def example_model():
|
|
||||||
logger.info("Starting example_bigquery_model function.")
|
|
||||||
provider = GoogleCloudBigQueryProvider(
|
|
||||||
cred_path="keys/google_service_accout.json",
|
|
||||||
)
|
|
||||||
ExamplModel.set_provider(provider)
|
|
||||||
|
|
||||||
# テーブル内の全レコードを取得する
|
|
||||||
# records = ExamplModel.fetch_all()
|
|
||||||
# logger.info(f"Total records: {len(records)}")
|
|
||||||
# for record in records:
|
|
||||||
# logger.info(f"Record: {record}")
|
|
||||||
|
|
||||||
# レコードを生成する
|
|
||||||
# new_record = ExamplModel(
|
|
||||||
# device_code="device_010",
|
|
||||||
# time_stamp=datetime(2025, 1, 1, 15, 0, 0)
|
|
||||||
# )
|
|
||||||
# new_record.create()
|
|
||||||
|
|
||||||
# レコードを大量に生成する
|
|
||||||
# ExamplModel.insert(
|
|
||||||
# rows=[
|
|
||||||
# ExamplModel(device_code="device_011",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
|
|
||||||
# ExamplModel(device_code="device_012",time_stamp=datetime(2025, 1, 1, 15, 0, 0)),
|
|
||||||
# ])
|
|
||||||
|
|
||||||
# テーブルのストリームバッファ情報を取得する
|
|
||||||
if ExamplModel.is_streaming_buffer():
|
|
||||||
logger.info("Table is currently receiving streaming data.")
|
|
||||||
logger.info("not updated or delete in the streaming buffer.")
|
|
||||||
else:
|
|
||||||
logger.info("Table is not receiving streaming data.")
|
|
||||||
# 特定の条件でレコードを削除する
|
|
||||||
# ExamplModel.delete(where=[("device_code", "=", "device_010")])
|
|
||||||
# レコードを更新する
|
|
||||||
ExamplModel.update(
|
|
||||||
values={"device_code": "device_011_updated"},
|
|
||||||
where=[("device_code", "=", "device_011")]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# records = ExamplModel.list(where=[("device_code", "=", "device_001")])
|
|
||||||
# logger.info(f"Records with device_code='device_001': {records if records else 'No records found'}")
|
|
||||||
|
|
||||||
|
|
||||||
# record = ExamplModel.first()
|
|
||||||
# logger.info(f"First record: {record if record else 'No record found'}")
|
|
||||||
|
|
||||||
|
|
||||||
example_model()
|
|
||||||
@ -1,25 +0,0 @@
|
|||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger(level=10)
|
|
||||||
|
|
||||||
from providers.duck_db_provider import DuckDBProvider
|
|
||||||
|
|
||||||
def example_duckdb():
|
|
||||||
logger.info("Starting example_duckdb function.")
|
|
||||||
file_path = "./data_science/data/y=*/news/news_*.csv"
|
|
||||||
provider = DuckDBProvider()
|
|
||||||
sql = f"""
|
|
||||||
SELECT *
|
|
||||||
FROM read_csv_auto('{file_path}' , HEADER=TRUE, IGNORE_ERRORS=TRUE)
|
|
||||||
"""
|
|
||||||
result = provider.query_df(sql)
|
|
||||||
print("latest published_parsed:", result)
|
|
||||||
|
|
||||||
# forで1件ずつ表示
|
|
||||||
for idx, row in result.iterrows():
|
|
||||||
logger.info(f"title:{row['title']}")
|
|
||||||
|
|
||||||
example_duckdb()
|
|
||||||
@ -1,87 +0,0 @@
|
|||||||
"""
|
|
||||||
HMACキーがひつようになる
|
|
||||||
* Google Cloud Consoleで発行する
|
|
||||||
* https://console.cloud.google.com/storage/settings
|
|
||||||
* 「相互運用性(Interoperability)」タブを開く
|
|
||||||
* 「HMACキーを作成」ボタンを押す
|
|
||||||
* 使いたいサービスアカウントを選択
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger(level=10)
|
|
||||||
|
|
||||||
import duckdb
|
|
||||||
from providers.google_cloud_storage_provider import GoogleCloudStorageProvider
|
|
||||||
from providers.duck_db_provider import DuckDBProvider
|
|
||||||
|
|
||||||
|
|
||||||
def example_init_google_cloud_storage():
|
|
||||||
|
|
||||||
logger.info("Starting example_google_cloud_storage function.")
|
|
||||||
gcs_provider = GoogleCloudStorageProvider(
|
|
||||||
cred_path="./keys/google_service_accout.json",
|
|
||||||
)
|
|
||||||
csv_data = """id,name,age,city,score,created_at
|
|
||||||
1,Alice,25,Tokyo,88,2025-10-01T09:00:00Z
|
|
||||||
2,Bob,30,Osaka,75,2025-10-02T09:30:00Z
|
|
||||||
3,Charlie,28,Nagoya,92,2025-10-03T10:00:00Z
|
|
||||||
4,David,35,Fukuoka,64,2025-10-04T11:15:00Z
|
|
||||||
5,Eva,22,Sapporo,80,2025-10-05T12:45:00Z
|
|
||||||
"""
|
|
||||||
gcs_provider.write_item("datasource-example-251018",
|
|
||||||
"example/y=2025/m=10/example.csv",
|
|
||||||
csv_data.encode("utf-8"),"text/csv")
|
|
||||||
|
|
||||||
buckets = gcs_provider.get_buckets()
|
|
||||||
logger.info(f"Buckets: {buckets}")
|
|
||||||
|
|
||||||
|
|
||||||
def example_duckdb_cloud_raw():
|
|
||||||
logger.info("Starting example_duckdb_cloud_raw function.")
|
|
||||||
|
|
||||||
# DuckDB接続
|
|
||||||
con = duckdb.connect()
|
|
||||||
con.sql(f"""
|
|
||||||
CREATE OR REPLACE SECRET gcs_creds (
|
|
||||||
TYPE gcs,
|
|
||||||
KEY_ID {os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY')},
|
|
||||||
SECRET {os.getenv('GCP_STORAGE_HMAC_SECRET_KEY')}
|
|
||||||
);
|
|
||||||
""")
|
|
||||||
|
|
||||||
query = f"""
|
|
||||||
SELECT * FROM read_csv_auto('gs://datasource-example-251018/example/y=2025/m=10/example.csv');
|
|
||||||
"""
|
|
||||||
result = con.execute(query).df()
|
|
||||||
logger.info(f"Read {len(result)} rows from GCS file.")
|
|
||||||
|
|
||||||
|
|
||||||
def example_duckdb_cloud_class():
|
|
||||||
logger.info("Starting example_duckdb_cloud_class function.")
|
|
||||||
# DuckDB接続
|
|
||||||
provider = DuckDBProvider()
|
|
||||||
provider.setup_gcs(
|
|
||||||
access_key=os.getenv('GCP_STORAGE_HMAC_ACCESS_KEY'),
|
|
||||||
secret_key=os.getenv('GCP_STORAGE_HMAC_SECRET_KEY'),
|
|
||||||
)
|
|
||||||
bucket_name = "datasource-example-251018"
|
|
||||||
object_name = "example/y=2025/m=*/example.csv"
|
|
||||||
|
|
||||||
query = f"""
|
|
||||||
SELECT * FROM {provider.get_gs_csv_name(bucket_name, object_name)};
|
|
||||||
"""
|
|
||||||
result = provider.query_df(query)
|
|
||||||
logger.info(f"Read {len(result)} rows from GCS file using DuckDBProvider.")
|
|
||||||
|
|
||||||
|
|
||||||
# example_init_google_cloud_storage()
|
|
||||||
# example_duckdb_cloud_raw()
|
|
||||||
example_duckdb_cloud_class()
|
|
||||||
@ -1,30 +0,0 @@
|
|||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),"..", "src")))
|
|
||||||
|
|
||||||
from typing import ClassVar, Optional
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger(level=10)
|
|
||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
from models.duck_base_model import DuckBaseModel
|
|
||||||
from providers.duck_db_provider import DuckDBProvider
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class NewsModel(DuckBaseModel):
|
|
||||||
title: str
|
|
||||||
file_glob: ClassVar[str] = "./data_science/data/y=2025/news/news_*.csv"
|
|
||||||
default_order_by: ClassVar[Optional[str]] = "pub_date DESC"
|
|
||||||
|
|
||||||
|
|
||||||
def example_duckdb_model():
|
|
||||||
logger.info("Starting example_duckdb_model function.")
|
|
||||||
provider = DuckDBProvider()
|
|
||||||
provider.connect()
|
|
||||||
NewsModel.provider = provider
|
|
||||||
|
|
||||||
news = NewsModel.first()
|
|
||||||
logger.info(f"Total news: {news if news else 'No news found'}")
|
|
||||||
|
|
||||||
# example_duckdb_model()
|
|
||||||
@ -1,47 +0,0 @@
|
|||||||
# openai_rss_reader_stdlib.py
|
|
||||||
# pip install feedparser
|
|
||||||
|
|
||||||
import feedparser
|
|
||||||
from email.utils import parsedate_to_datetime
|
|
||||||
|
|
||||||
FEEDS = [
|
|
||||||
"https://platform.openai.com/docs/release-notes.rss", # リリースノート
|
|
||||||
"https://openai.com/blog/rss.xml", # ブログ
|
|
||||||
]
|
|
||||||
|
|
||||||
def to_ts(entry: feedparser.FeedParserDict):
|
|
||||||
# published_parsed / updated_parsed があれば直接datetime化
|
|
||||||
if entry.get("published_parsed"):
|
|
||||||
return parsedate_to_datetime(entry.published)
|
|
||||||
if entry.get("updated_parsed"):
|
|
||||||
return parsedate_to_datetime(entry.updated)
|
|
||||||
return None
|
|
||||||
|
|
||||||
def uniq_key(entry: feedparser.FeedParserDict):
|
|
||||||
return entry.get("id") or entry.get("guid") or entry.get("link")
|
|
||||||
|
|
||||||
def fetch_all(feeds):
|
|
||||||
items = []
|
|
||||||
seen = set()
|
|
||||||
for url in feeds:
|
|
||||||
d: feedparser.FeedParserDict = feedparser.parse(url)
|
|
||||||
for e in d.entries:
|
|
||||||
k = uniq_key(e)
|
|
||||||
if not k or k in seen:
|
|
||||||
continue
|
|
||||||
seen.add(k)
|
|
||||||
ts = to_ts(e)
|
|
||||||
items.append({
|
|
||||||
"title": e.get("title", "(no title)"),
|
|
||||||
"link": e.get("link"),
|
|
||||||
"published": e.get("published") or e.get("updated"),
|
|
||||||
"ts": ts,
|
|
||||||
"source": url,
|
|
||||||
})
|
|
||||||
# ts が None のものは末尾に回す
|
|
||||||
items.sort(key=lambda x: (x["ts"] is not None, x["ts"]), reverse=True)
|
|
||||||
return items
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
for i, it in enumerate(fetch_all(FEEDS), 1):
|
|
||||||
print(f"{i:02d}. {it['title']}\n {it['link']}\n {it['published']} [{it['source']}]\n")
|
|
||||||
@ -2,26 +2,13 @@ matplotlib
|
|||||||
requests
|
requests
|
||||||
pyttsx3
|
pyttsx3
|
||||||
|
|
||||||
# pylib
|
|
||||||
pandas
|
|
||||||
# firebase_provider
|
# firebase_provider
|
||||||
firebase-admin>=7.1.0
|
firebase-admin>=7.1.0
|
||||||
# google cloud storage
|
# google cloud storage
|
||||||
google-cloud-storage
|
google-cloud-storage
|
||||||
# google cloud big query
|
|
||||||
google-cloud-bigquery
|
|
||||||
# google cloud big query option
|
|
||||||
google-cloud-bigquery-storage
|
|
||||||
# bigquery to dataframe
|
|
||||||
db-dtypes
|
|
||||||
# onedrive
|
# onedrive
|
||||||
msal
|
msal
|
||||||
|
|
||||||
# common
|
# common
|
||||||
python-dotenv
|
python-dotenv
|
||||||
# RSS
|
|
||||||
feedparser
|
|
||||||
# model
|
|
||||||
pydantic
|
|
||||||
# duckdb
|
|
||||||
duckdb==1.3.2
|
|
||||||
@ -28,6 +28,7 @@ class CustomLogger(Singleton):
|
|||||||
|
|
||||||
def __init__(self, name='main', log_file=None, level=logging.INFO):
|
def __init__(self, name='main', log_file=None, level=logging.INFO):
|
||||||
if hasattr(self, '_initialized') and self._initialized:
|
if hasattr(self, '_initialized') and self._initialized:
|
||||||
|
self.logger.setLevel(level)
|
||||||
return # すでに初期化済みなら何もしない
|
return # すでに初期化済みなら何もしない
|
||||||
|
|
||||||
self.logger = logging.getLogger(name)
|
self.logger = logging.getLogger(name)
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
This implementation is thread-safe and ensures that only one instance of the class is created.
|
This implementation is thread-safe and ensures that only one instance of the class is created.
|
||||||
|
|
||||||
Singleton が提供するのは「同じインスタンスを返す仕組み」
|
Singleton が提供するのは「同じインスタンスを返す仕組み」
|
||||||
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
|
* __init__() は毎回呼ばれる(多くの人が意図しない動作)
|
||||||
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。
|
* __init__の2回目は_initialized というフラグは 使う側で管理する必要がある。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|||||||
@ -14,7 +14,7 @@ class TaskScheduler:
|
|||||||
# インスタンスを作成し、スレッドを起動
|
# インスタンスを作成し、スレッドを起動
|
||||||
scheduler = TaskScheduler()
|
scheduler = TaskScheduler()
|
||||||
|
|
||||||
# インスタンスを破棄(しかしスレッドは続行)
|
# インスタンスを破棄(しかしスレッドは続行)
|
||||||
# del scheduler
|
# del scheduler
|
||||||
|
|
||||||
# メインスレッドは他の作業を続ける
|
# メインスレッドは他の作業を続ける
|
||||||
|
|||||||
@ -1,510 +0,0 @@
|
|||||||
import os
|
|
||||||
import base64
|
|
||||||
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
|
|
||||||
from enum import Enum
|
|
||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
|
|
||||||
from datetime import datetime, date, time, timezone
|
|
||||||
from dataclasses import dataclass, fields, is_dataclass
|
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
from providers.google_cloud_bigquery_provider import GoogleCloudBigQueryProvider
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger()
|
|
||||||
|
|
||||||
T = TypeVar("T", bound="BigQueryBaseModel")
|
|
||||||
|
|
||||||
Filter = Tuple[str, str, Any]
|
|
||||||
|
|
||||||
def _build_where_clause_bq_without_prams(
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
|
|
||||||
)->str:
|
|
||||||
"""
|
|
||||||
BigQuery 向け WHERE 句の生成(パラメータなし)
|
|
||||||
Returns:
|
|
||||||
where_sql
|
|
||||||
"""
|
|
||||||
if where is None:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
clauses = []
|
|
||||||
|
|
||||||
if isinstance(where, str):
|
|
||||||
# 文字列のまま(完全SQL指定)
|
|
||||||
return f"WHERE {where}"
|
|
||||||
|
|
||||||
if isinstance(where, dict):
|
|
||||||
for col, val in where.items():
|
|
||||||
if isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
|
|
||||||
clauses.append(f"{col} IN ({placeholders})")
|
|
||||||
elif val is None:
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} = '{str(val).replace('\'', '\\\'')}'")
|
|
||||||
else:
|
|
||||||
# sequence of (col, op, val)
|
|
||||||
for col, op, val in where:
|
|
||||||
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = ", ".join([f"'{str(v).replace('\'', '\\\'')}'" for v in val])
|
|
||||||
clauses.append(f"{col} IN ({placeholders})")
|
|
||||||
elif val is None and op in ("=", "=="):
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} {op} '{str(val).replace('\'', '\\\'')}'")
|
|
||||||
|
|
||||||
where_sql = "WHERE " + " AND ".join(clauses)
|
|
||||||
return where_sql
|
|
||||||
|
|
||||||
def _build_where_clause_bq(
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Tuple[str, str, Any]]]]
|
|
||||||
) -> Tuple[str, List[Dict[str, Any]]]:
|
|
||||||
"""
|
|
||||||
BigQuery 向け WHERE 句とパラメータの生成
|
|
||||||
Returns:
|
|
||||||
(where_sql, query_params)
|
|
||||||
"""
|
|
||||||
if where is None:
|
|
||||||
return "", []
|
|
||||||
|
|
||||||
params = []
|
|
||||||
clauses = []
|
|
||||||
|
|
||||||
if isinstance(where, str):
|
|
||||||
# 文字列のまま(完全SQL指定)
|
|
||||||
return f"WHERE {where}", params
|
|
||||||
|
|
||||||
if isinstance(where, dict):
|
|
||||||
for i, (col, val) in enumerate(where.items()):
|
|
||||||
param_name = f"param{i}"
|
|
||||||
if isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = []
|
|
||||||
for j, v in enumerate(val):
|
|
||||||
pname = f"{param_name}_{j}"
|
|
||||||
placeholders.append(f"@{pname}")
|
|
||||||
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
|
|
||||||
clauses.append(f"{col} IN ({', '.join(placeholders)})")
|
|
||||||
elif val is None:
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} = @{param_name}")
|
|
||||||
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
|
|
||||||
else:
|
|
||||||
# sequence of (col, op, val)
|
|
||||||
for i, (col, op, val) in enumerate(where):
|
|
||||||
param_name = f"param{i}"
|
|
||||||
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = []
|
|
||||||
for j, v in enumerate(val):
|
|
||||||
pname = f"{param_name}_{j}"
|
|
||||||
placeholders.append(f"@{pname}")
|
|
||||||
params.append({"name": pname, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(v)}})
|
|
||||||
clauses.append(f"{col} IN ({', '.join(placeholders)})")
|
|
||||||
elif val is None and op in ("=", "=="):
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} {op} @{param_name}")
|
|
||||||
params.append({"name": param_name, "parameterType": {"type": "STRING"}, "parameterValue": {"value": str(val)}})
|
|
||||||
|
|
||||||
where_sql = "WHERE " + " AND ".join(clauses)
|
|
||||||
return where_sql, params
|
|
||||||
|
|
||||||
|
|
||||||
def to_bq_json(v):
|
|
||||||
if isinstance(v, datetime):
|
|
||||||
# TIMESTAMP 用(UTCに正規化して Z を付ける)
|
|
||||||
if v.tzinfo is None:
|
|
||||||
v = v.replace(tzinfo=timezone.utc)
|
|
||||||
return v.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
|
||||||
if isinstance(v, date) and not isinstance(v, datetime):
|
|
||||||
# DATE 用
|
|
||||||
return v.isoformat()
|
|
||||||
if isinstance(v, time):
|
|
||||||
# TIME 用
|
|
||||||
# v.isoformat() は microseconds を含むことあり → そのままでOK
|
|
||||||
return v.isoformat()
|
|
||||||
if isinstance(v, Decimal):
|
|
||||||
# NUMERIC/BIGNUMERIC に文字列で渡すのが安全
|
|
||||||
return str(v)
|
|
||||||
if isinstance(v, bytes):
|
|
||||||
# BYTES は base64 文字列
|
|
||||||
return base64.b64encode(v).decode("ascii")
|
|
||||||
if isinstance(v, Enum):
|
|
||||||
return v.value
|
|
||||||
if isinstance(v, list):
|
|
||||||
return [to_bq_json(x) for x in v]
|
|
||||||
if isinstance(v, dict):
|
|
||||||
return {k: to_bq_json(val) for k, val in v.items()}
|
|
||||||
return v
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class BigQueryBaseModel:
|
|
||||||
|
|
||||||
table_id: ClassVar[str] # 例: "datasetid.table_name"
|
|
||||||
provider: ClassVar[GoogleCloudBigQueryProvider | None] = None
|
|
||||||
key_field: ClassVar[Optional[str]] = None # 例: "id" 主キーに相当するフィールド名
|
|
||||||
columns_select: ClassVar[str] = "*" # 例: "col1, col2, col3"
|
|
||||||
default_order_by: ClassVar[Optional[str]] = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def set_provider(cls, provider: GoogleCloudBigQueryProvider):
|
|
||||||
cls.provider = provider
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def fetch_all(cls: Type[T]) -> List[T]:
|
|
||||||
"""テーブル内の全レコードを取得する"""
|
|
||||||
if cls.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
rows = cls.provider.list_rows(table_id=cls.table_id)
|
|
||||||
result: List[T] = []
|
|
||||||
for row in rows:
|
|
||||||
data = dict(row)
|
|
||||||
obj: T = cls(**data) # type: ignore[arg-type]
|
|
||||||
result.append(obj)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def create(self):
|
|
||||||
"""レコードをテーブルに挿入する"""
|
|
||||||
if self.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
data = {f.name: getattr(self, f.name) for f in fields(self)}
|
|
||||||
data = to_bq_json(data)
|
|
||||||
|
|
||||||
logger.debug(f"Inserting data: {data}")
|
|
||||||
self.provider.insert_rows(
|
|
||||||
table_id=self.table_id,
|
|
||||||
rows=[data]
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def insert(cls, rows: List[T]):
|
|
||||||
"""複数レコードをテーブルに挿入する"""
|
|
||||||
if cls.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
bq_rows = []
|
|
||||||
for row in rows:
|
|
||||||
data = {f.name: getattr(row, f.name) for f in fields(row)}
|
|
||||||
bq_rows.append(to_bq_json(data))
|
|
||||||
|
|
||||||
logger.info(f"Inserting data: {len(bq_rows)}")
|
|
||||||
logger.debug(f"Inserting data details: {bq_rows[:10]}...")
|
|
||||||
cls.provider.insert_rows(
|
|
||||||
table_id=cls.table_id,
|
|
||||||
rows=bq_rows
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_streaming_buffer(cls) -> bool:
|
|
||||||
"""テーブルのストリームバッファ情報を取得する"""
|
|
||||||
if cls.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
buf_info = cls.provider.get_streaming_buffer_info(table_id=cls.table_id)
|
|
||||||
logger.debug(f"Streaming buffer info: {buf_info}")
|
|
||||||
|
|
||||||
return buf_info is not None
|
|
||||||
|
|
||||||
def save(self):
|
|
||||||
"""レコードをテーブルに更新する"""
|
|
||||||
if self.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
if self.is_streaming_buffer():
|
|
||||||
raise RuntimeError("Cannot update records in streaming buffer.")
|
|
||||||
|
|
||||||
if self.key_field is None:
|
|
||||||
raise RuntimeError("key_field is not set.")
|
|
||||||
|
|
||||||
key_value = getattr(self, self.key_field)
|
|
||||||
if key_value is None:
|
|
||||||
raise ValueError(f"Key field '{self.key_field}' value is None.")
|
|
||||||
|
|
||||||
data = {f.name: getattr(self, f.name) for f in fields(self)}
|
|
||||||
data = to_bq_json(data)
|
|
||||||
|
|
||||||
set_clauses = []
|
|
||||||
for k, v in data.items():
|
|
||||||
if k == self.key_field:
|
|
||||||
continue
|
|
||||||
if isinstance(v, str):
|
|
||||||
v_str = f"'{v.replace('\'', '\\\'')}'"
|
|
||||||
else:
|
|
||||||
v_str = str(v)
|
|
||||||
set_clauses.append(f"{k}={v_str}")
|
|
||||||
set_clause = ", ".join(set_clauses)
|
|
||||||
|
|
||||||
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
|
|
||||||
|
|
||||||
logger.debug(f"Updating data: SET {set_clause} WHERE {where_clause}")
|
|
||||||
self.provider.update_query(
|
|
||||||
table_id=self.table_id,
|
|
||||||
values=set_clause,
|
|
||||||
were_clause=where_clause
|
|
||||||
)
|
|
||||||
|
|
||||||
def destroy(self):
|
|
||||||
"""レコードをテーブルから削除する"""
|
|
||||||
if self.provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
if self.is_streaming_buffer():
|
|
||||||
raise RuntimeError("Cannot delete records in streaming buffer.")
|
|
||||||
|
|
||||||
if self.key_field is None:
|
|
||||||
raise RuntimeError("key_field is not set.")
|
|
||||||
|
|
||||||
key_value = getattr(self, self.key_field)
|
|
||||||
if key_value is None:
|
|
||||||
raise ValueError(f"Key field '{self.key_field}' value is None.")
|
|
||||||
|
|
||||||
where_clause = f"{self.key_field}='{str(key_value).replace('\'', '\\\'')}'"
|
|
||||||
|
|
||||||
logger.debug(f"Deleting data: WHERE {where_clause}")
|
|
||||||
self.provider.delete_query(
|
|
||||||
table_id=self.table_id,
|
|
||||||
were_clause=where_clause
|
|
||||||
)
|
|
||||||
|
|
||||||
# -------------------------------------
|
|
||||||
# select
|
|
||||||
@classmethod
|
|
||||||
def select(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
limit: Optional[int] = None,
|
|
||||||
offset: Optional[int] = None,
|
|
||||||
):
|
|
||||||
params_dict = []
|
|
||||||
where_sql, params_dict = _build_where_clause_bq(where)
|
|
||||||
# where_sql = _build_where_clause_bq_without_prams(where)
|
|
||||||
order = order_by or cls.default_order_by
|
|
||||||
order_sql = f"ORDER BY {order}" if order else ""
|
|
||||||
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
|
|
||||||
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
|
|
||||||
sql = f"""
|
|
||||||
SELECT {cls.columns_select}
|
|
||||||
FROM `{cls.table_id}`
|
|
||||||
{where_sql}
|
|
||||||
{order_sql}
|
|
||||||
{limit_sql}
|
|
||||||
{offset_sql}
|
|
||||||
"""
|
|
||||||
# logger.debug(f"Select Query: {sql} with params_dict: {params_dict}")
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
if provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
params = []
|
|
||||||
for p in params_dict:
|
|
||||||
param = provider.addQueryParameter(
|
|
||||||
name=p["name"],
|
|
||||||
value=p["parameterValue"]["value"],
|
|
||||||
param_type=p["parameterType"]["type"]
|
|
||||||
)
|
|
||||||
params.append(param)
|
|
||||||
# パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用
|
|
||||||
# query_job = provider.excute_query(sql, params=params)
|
|
||||||
query_job = provider.excute_query(sql, params=params)
|
|
||||||
return query_job
|
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def select_df(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
limit: Optional[int] = None,
|
|
||||||
offset: Optional[int] = None,
|
|
||||||
) -> pd.DataFrame:
|
|
||||||
"""DataFrame で取得
|
|
||||||
|
|
||||||
Notes:
|
|
||||||
Google Cloud BigQuery から直接 DataFrame を取得するには db-dtypes パッケージが必要
|
|
||||||
"""
|
|
||||||
query_job = cls.select(
|
|
||||||
provider=provider,
|
|
||||||
where=where,
|
|
||||||
order_by=order_by,
|
|
||||||
limit=limit,
|
|
||||||
offset=offset,
|
|
||||||
)
|
|
||||||
return query_job.to_dataframe()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def list(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
limit: Optional[int] = None,
|
|
||||||
offset: Optional[int] = None,
|
|
||||||
) -> List[T]:
|
|
||||||
job = cls.select(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
|
|
||||||
results = job.result()
|
|
||||||
instances: List[T] = []
|
|
||||||
for row in results:
|
|
||||||
data = dict(row)
|
|
||||||
instance = cls(**data) # type: ignore
|
|
||||||
instances.append(instance)
|
|
||||||
|
|
||||||
# df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset)
|
|
||||||
# if not is_dataclass(cls):
|
|
||||||
# raise TypeError(f"{cls.__name__} is not a dataclass.")
|
|
||||||
# instances: List[T] = []
|
|
||||||
# for _, row in df.iterrows():
|
|
||||||
# data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
|
|
||||||
# instance = cls(**data) # type: ignore
|
|
||||||
# instances.append(instance)
|
|
||||||
return instances
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def first(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
) -> Optional[T]:
|
|
||||||
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
|
|
||||||
return results[0] if results else None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def count(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
) -> int:
|
|
||||||
where_sql, params_dict = _build_where_clause_bq(where)
|
|
||||||
sql = f"""
|
|
||||||
SELECT COUNT(*) as cnt
|
|
||||||
FROM `{cls.table_id}`
|
|
||||||
{where_sql}
|
|
||||||
"""
|
|
||||||
# パラメータバインド:GoogleCloudBigQueryProvider の生 client を使用
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
if provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
params = []
|
|
||||||
for p in params_dict:
|
|
||||||
param = provider.addQueryParameter(
|
|
||||||
name=p["name"],
|
|
||||||
value=p["parameterValue"]["value"],
|
|
||||||
param_type=p["parameterType"]["type"]
|
|
||||||
)
|
|
||||||
params.append(param)
|
|
||||||
|
|
||||||
provider.excute_query(
|
|
||||||
sql,
|
|
||||||
params=params
|
|
||||||
)
|
|
||||||
|
|
||||||
query_job = provider.excute_query(sql, params=params)
|
|
||||||
results = query_job.result()
|
|
||||||
for row in results:
|
|
||||||
return row["cnt"]
|
|
||||||
return 0
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def exists(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
) -> bool:
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
return cls.count(provider, where=where) > 0
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def delete(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
):
|
|
||||||
where_sql, params_dict = _build_where_clause_bq(where)
|
|
||||||
sql = f"""
|
|
||||||
DELETE FROM `{cls.table_id}`
|
|
||||||
{where_sql}
|
|
||||||
"""
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
if provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
params = []
|
|
||||||
for p in params_dict:
|
|
||||||
param = provider.addQueryParameter(
|
|
||||||
name=p["name"],
|
|
||||||
value=p["parameterValue"]["value"],
|
|
||||||
param_type=p["parameterType"]["type"]
|
|
||||||
)
|
|
||||||
params.append(param)
|
|
||||||
|
|
||||||
job = provider.excute_query(
|
|
||||||
sql,
|
|
||||||
params=params
|
|
||||||
)
|
|
||||||
job.result() # ジョブの完了を待機
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def update(
|
|
||||||
cls: Type[T],
|
|
||||||
values: dict[str, Any],
|
|
||||||
*,
|
|
||||||
where: Union[str, Dict[str, Any], Sequence[Filter]] ,
|
|
||||||
provider: GoogleCloudBigQueryProvider = None,
|
|
||||||
):
|
|
||||||
params_dicts = []
|
|
||||||
where_sql, params_dicts = _build_where_clause_bq(where)
|
|
||||||
set_clauses = []
|
|
||||||
for k, v in values.items():
|
|
||||||
param_name = f"set_{k}"
|
|
||||||
set_clauses.append(f"{k}=@{param_name}")
|
|
||||||
params_dicts.append({
|
|
||||||
"name": param_name,
|
|
||||||
"parameterType": {"type": "STRING"},
|
|
||||||
"parameterValue": {"value": str(v)}
|
|
||||||
})
|
|
||||||
|
|
||||||
sql = f"""
|
|
||||||
UPDATE `{cls.table_id}`
|
|
||||||
SET {', '.join(set_clauses)}
|
|
||||||
{where_sql}
|
|
||||||
"""
|
|
||||||
logger.debug(f"update Query: {sql} with params_dict: {params_dicts}")
|
|
||||||
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
if provider is None:
|
|
||||||
raise RuntimeError("GoogleCloudBigQueryProvider not set. Call .set_provider().")
|
|
||||||
|
|
||||||
params = []
|
|
||||||
for p in params_dicts:
|
|
||||||
param = provider.addQueryParameter(
|
|
||||||
name=p["name"],
|
|
||||||
value=p["parameterValue"]["value"],
|
|
||||||
param_type=p["parameterType"]["type"]
|
|
||||||
)
|
|
||||||
params.append(param)
|
|
||||||
|
|
||||||
job = provider.excute_query(
|
|
||||||
sql,
|
|
||||||
params=params
|
|
||||||
)
|
|
||||||
job.result() # ジョブの完了を待機
|
|
||||||
@ -1,184 +0,0 @@
|
|||||||
import os
|
|
||||||
from typing import Any, Dict, Optional,Type,TypeVar,Tuple,Sequence,Union,List,Iterable,ClassVar
|
|
||||||
from dataclasses import dataclass, fields, is_dataclass
|
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
from providers.duck_db_provider import DuckDBProvider
|
|
||||||
|
|
||||||
DATA_BASEPATH=os.getenv("DATA_BASEPATH","./data")
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger()
|
|
||||||
|
|
||||||
T = TypeVar("T", bound="DuckBaseModel")
|
|
||||||
|
|
||||||
Filter = Tuple[str, str, Any]
|
|
||||||
|
|
||||||
def _build_where_clause(where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]]
|
|
||||||
) -> Tuple[str, List[Any]]:
|
|
||||||
if where is None:
|
|
||||||
return "", []
|
|
||||||
params: List[Any] = []
|
|
||||||
|
|
||||||
if isinstance(where, str):
|
|
||||||
return f"WHERE {where}", params
|
|
||||||
|
|
||||||
clauses: List[str] = []
|
|
||||||
if isinstance(where, dict):
|
|
||||||
for col, val in where.items():
|
|
||||||
if isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = ", ".join(["?"] * len(val))
|
|
||||||
clauses.append(f"{col} IN ({placeholders})")
|
|
||||||
params.extend(list(val))
|
|
||||||
elif val is None:
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} = ?")
|
|
||||||
params.append(val)
|
|
||||||
else:
|
|
||||||
# sequence of (col, op, value)
|
|
||||||
for col, op, val in where:
|
|
||||||
if (op.upper() == "IN") and isinstance(val, (list, tuple, set)):
|
|
||||||
placeholders = ", ".join(["?"] * len(val))
|
|
||||||
clauses.append(f"{col} IN ({placeholders})")
|
|
||||||
params.extend(list(val))
|
|
||||||
elif val is None and op in ("=", "=="):
|
|
||||||
clauses.append(f"{col} IS NULL")
|
|
||||||
else:
|
|
||||||
clauses.append(f"{col} {op} ?")
|
|
||||||
params.append(val)
|
|
||||||
|
|
||||||
if not clauses:
|
|
||||||
return "", []
|
|
||||||
return "WHERE " + " AND ".join(clauses), params
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class DuckBaseModel:
|
|
||||||
file_glob: ClassVar[str] # 例: "./data_science/data/y=2025/news/news_*.csv"
|
|
||||||
provider: ClassVar[DuckDBProvider | None] = None
|
|
||||||
columns_select: ClassVar[str] = "*" # 例: "published_parsed, title, link"
|
|
||||||
default_order_by: ClassVar[Optional[str]] = None
|
|
||||||
|
|
||||||
# read_csv_auto オプション(必要ならサブクラスで上書き)
|
|
||||||
hive_partitioning: ClassVar[bool] = True
|
|
||||||
union_by_name: ClassVar[bool] = True
|
|
||||||
header: ClassVar[bool] = True
|
|
||||||
ignore_errors: ClassVar[bool] = True
|
|
||||||
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def set_provider(cls, provider: DuckDBProvider):
|
|
||||||
cls.provider = provider
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# ---- FROM 句(CSV読込) ----
|
|
||||||
@classmethod
|
|
||||||
def _from_clause(cls,file_glob=None) -> str:
|
|
||||||
path = file_glob or cls.file_glob
|
|
||||||
path = path.replace("'", "''")
|
|
||||||
path =f"{DATA_BASEPATH}{path}"
|
|
||||||
|
|
||||||
# path = cls.file_glob.replace("'", "''")
|
|
||||||
hp = 1 if cls.hive_partitioning else 0
|
|
||||||
un = 1 if cls.union_by_name else 0
|
|
||||||
hd = "TRUE" if cls.header else "FALSE"
|
|
||||||
ig = "TRUE" if cls.ignore_errors else "FALSE"
|
|
||||||
return (f"read_csv_auto('{path}', HEADER={hd}, IGNORE_ERRORS={ig}, "
|
|
||||||
f"hive_partitioning={hp}, union_by_name={un})")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def select_df(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: DuckDBProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
limit: Optional[int] = None,
|
|
||||||
offset: Optional[int] = None,
|
|
||||||
file_glob=None,
|
|
||||||
) -> List[T]:
|
|
||||||
where_sql, params = _build_where_clause(where)
|
|
||||||
order = order_by or cls.default_order_by
|
|
||||||
order_sql = f"ORDER BY {order}" if order else ""
|
|
||||||
limit_sql = f"LIMIT {int(limit)}" if limit is not None else ""
|
|
||||||
offset_sql = f"OFFSET {int(offset)}" if offset is not None else ""
|
|
||||||
sql = f"""
|
|
||||||
SELECT {cls.columns_select}
|
|
||||||
FROM {cls._from_clause(file_glob=file_glob)}
|
|
||||||
{where_sql}
|
|
||||||
{order_sql}
|
|
||||||
{limit_sql}
|
|
||||||
{offset_sql}
|
|
||||||
"""
|
|
||||||
# パラメータバインド:DuckDBProvider の生 connection を使用
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
|
|
||||||
cur = provider.con.execute(sql, params)
|
|
||||||
return cur.df()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def list(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: DuckDBProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
limit: Optional[int] = None,
|
|
||||||
offset: Optional[int] = None,
|
|
||||||
file_glob=None,
|
|
||||||
) -> List[T]:
|
|
||||||
df = cls.select_df(provider=provider, where=where, order_by=order_by, limit=limit, offset=offset,file_glob=file_glob)
|
|
||||||
if not is_dataclass(cls):
|
|
||||||
raise TypeError(f"{cls.__name__} is not a dataclass.")
|
|
||||||
instances: List[T] = []
|
|
||||||
for _, row in df.iterrows():
|
|
||||||
data = {f.name: row[f.name] for f in fields(cls) if f.name in row}
|
|
||||||
instance = cls(**data) # type: ignore
|
|
||||||
instances.append(instance)
|
|
||||||
return instances
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def first(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: DuckDBProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
order_by: Optional[str] = None,
|
|
||||||
) -> Optional[T]:
|
|
||||||
results = cls.list(provider=provider, where=where, order_by=order_by, limit=1)
|
|
||||||
return results[0] if results else None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def count(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: DuckDBProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
) -> int:
|
|
||||||
where_sql, params = _build_where_clause(where)
|
|
||||||
sql = f"""
|
|
||||||
SELECT COUNT(*) AS cnt
|
|
||||||
FROM {cls._from_clause()}
|
|
||||||
{where_sql}
|
|
||||||
"""
|
|
||||||
# パラメータバインド:DuckDBProvider の生 connection を使用
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
|
|
||||||
cur = provider.con.execute(sql, params)
|
|
||||||
row = cur.fetchone()
|
|
||||||
return row["cnt"] if row else 0
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def exists(
|
|
||||||
cls: Type[T],
|
|
||||||
provider: DuckDBProvider = None,
|
|
||||||
*,
|
|
||||||
where: Optional[Union[str, Dict[str, Any], Sequence[Filter]]] = None,
|
|
||||||
) -> bool:
|
|
||||||
if provider is None:
|
|
||||||
provider = cls.provider
|
|
||||||
return cls.count(provider, where=where) > 0
|
|
||||||
@ -45,7 +45,7 @@ class FirestoreBaseModel:
|
|||||||
return cls._col().document(doc_id)
|
return cls._col().document(doc_id)
|
||||||
|
|
||||||
def _as_mutable_dict(self) -> Dict[str, Any]:
|
def _as_mutable_dict(self) -> Dict[str, Any]:
|
||||||
"""dataclass → dict(id と None は必要に応じて処理)"""
|
"""dataclass → dict(id と None は必要に応じて処理)"""
|
||||||
data = asdict(self)
|
data = asdict(self)
|
||||||
data.pop("_doc_id", None)
|
data.pop("_doc_id", None)
|
||||||
data.pop("collection_name", None)
|
data.pop("collection_name", None)
|
||||||
@ -54,7 +54,7 @@ class FirestoreBaseModel:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
|
def _from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
|
||||||
"""dict -> モデル(data内の'id'を拾う)"""
|
"""dict -> モデル(data内の'id'を拾う)"""
|
||||||
data = dict(data)
|
data = dict(data)
|
||||||
_id = data.pop("id", None)
|
_id = data.pop("id", None)
|
||||||
obj: T = cls(**data) # type: ignore[arg-type]
|
obj: T = cls(**data) # type: ignore[arg-type]
|
||||||
@ -88,7 +88,7 @@ class FirestoreBaseModel:
|
|||||||
return self._doc_id
|
return self._doc_id
|
||||||
|
|
||||||
def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None:
|
def update_fields(self, changes: Dict[str, Any], auto_timestamp: bool = True) -> None:
|
||||||
"""部分更新(set(merge=True)の糖衣)"""
|
"""部分更新(set(merge=True)の糖衣)"""
|
||||||
if not self._doc_id:
|
if not self._doc_id:
|
||||||
raise ValueError("Cannot update without id.")
|
raise ValueError("Cannot update without id.")
|
||||||
payload = dict(changes)
|
payload = dict(changes)
|
||||||
@ -205,7 +205,7 @@ class FirestoreBaseModel:
|
|||||||
) -> int:
|
) -> int:
|
||||||
"""
|
"""
|
||||||
条件でヒットしたドキュメントを一括削除。削除件数を返す。
|
条件でヒットしたドキュメントを一括削除。削除件数を返す。
|
||||||
※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う)
|
※ list_documents に依存(“複合インデックスなし”ポリシーはそちらの制約に従う)
|
||||||
"""
|
"""
|
||||||
from providers.firestore_provider import FireStoreProvider
|
from providers.firestore_provider import FireStoreProvider
|
||||||
rows = FireStoreProvider.list_documents(
|
rows = FireStoreProvider.list_documents(
|
||||||
|
|||||||
@ -1,73 +0,0 @@
|
|||||||
import duckdb
|
|
||||||
|
|
||||||
|
|
||||||
class DuckDBProvider:
|
|
||||||
|
|
||||||
def __init__(self, db_path: str = ":memory:", read_only: bool = False):
|
|
||||||
self.con = self.connect(db_path, read_only)
|
|
||||||
|
|
||||||
def connect(self, db_path: str = ":memory:", read_only: bool = False):
|
|
||||||
return duckdb.connect(database=db_path, read_only=read_only)
|
|
||||||
|
|
||||||
def setup_gcs(self, access_key: str,secret_key: str):
|
|
||||||
"""GCSのシークレットを設定する"""
|
|
||||||
if not self.con:
|
|
||||||
raise ValueError("DuckDB is not connected.")
|
|
||||||
self.con.sql(f"""
|
|
||||||
CREATE OR REPLACE SECRET gcs_creds (
|
|
||||||
TYPE gcs,
|
|
||||||
KEY_ID '{access_key}',
|
|
||||||
SECRET '{secret_key}'
|
|
||||||
);
|
|
||||||
""")
|
|
||||||
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""接続を閉じる"""
|
|
||||||
if self.con:
|
|
||||||
self.con.close()
|
|
||||||
|
|
||||||
def query_df(self, sql: str):
|
|
||||||
"""SQLクエリを実行してDataFrameで返す"""
|
|
||||||
return self.con.execute(sql).df()
|
|
||||||
|
|
||||||
def max_value(
|
|
||||||
self,
|
|
||||||
file_path: str,
|
|
||||||
column: str,
|
|
||||||
hive_partitioning: bool = True,
|
|
||||||
union_by_name: bool = True,
|
|
||||||
) -> any:
|
|
||||||
"""CSVファイルの指定列の最大値を取得する"""
|
|
||||||
query = f"""
|
|
||||||
SELECT MAX({column}) AS max_{column}
|
|
||||||
FROM read_csv_auto('{file_path}',
|
|
||||||
hive_partitioning={1 if hive_partitioning else 0},
|
|
||||||
union_by_name={1 if union_by_name else 0}
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
result = self.con.execute(query).fetchone()[0]
|
|
||||||
return result
|
|
||||||
|
|
||||||
def load_file(self, file_glob: str, table: str):
|
|
||||||
"""CSVを読み込みテーブル化"""
|
|
||||||
sql = f"""
|
|
||||||
CREATE OR REPLACE TABLE {table} AS
|
|
||||||
SELECT *
|
|
||||||
FROM read_csv_auto('{file_glob}', HEADER=TRUE, IGNORE_ERRORS=TRUE)
|
|
||||||
"""
|
|
||||||
self.con.execute(sql)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_gs_csv_name(
|
|
||||||
backet: str,
|
|
||||||
object_name: str,
|
|
||||||
hive_partitioning: bool = True,
|
|
||||||
union_by_name: bool = True,
|
|
||||||
) -> str:
|
|
||||||
|
|
||||||
return f"""read_csv_auto('gs://{backet}/{object_name}',
|
|
||||||
hive_partitioning={1 if hive_partitioning else 0},
|
|
||||||
union_by_name={1 if union_by_name else 0}
|
|
||||||
)
|
|
||||||
"""
|
|
||||||
@ -78,12 +78,12 @@ class FireStoreProvider():
|
|||||||
start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None,
|
start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None,
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応)
|
コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
collection_name: コレクション名
|
collection_name: コレクション名
|
||||||
filters: [(field, op, value), ...]
|
filters: [(field, op, value), ...]
|
||||||
order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順)
|
order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順)
|
||||||
limit: 取得件数の上限
|
limit: 取得件数の上限
|
||||||
start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書
|
start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書
|
||||||
|
|
||||||
@ -189,13 +189,13 @@ class FireStoreProvider():
|
|||||||
merge: bool = True,
|
merge: bool = True,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
ドキュメントを更新。merge=True なら部分更新(推奨)。
|
ドキュメントを更新。merge=True なら部分更新(推奨)。
|
||||||
"""
|
"""
|
||||||
ref = cls.get_doc_ref(collection_name, doc_id)
|
ref = cls.get_doc_ref(collection_name, doc_id)
|
||||||
if merge:
|
if merge:
|
||||||
ref.set(data, merge=True)
|
ref.set(data, merge=True)
|
||||||
else:
|
else:
|
||||||
# 全置換(存在しないと作成される)
|
# 全置換(存在しないと作成される)
|
||||||
ref.set(data, merge=False)
|
ref.set(data, merge=False)
|
||||||
logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})")
|
logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})")
|
||||||
|
|
||||||
|
|||||||
@ -1,662 +0,0 @@
|
|||||||
import os
|
|
||||||
import time
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional, Dict, Union,Sequence
|
|
||||||
from enum import Enum
|
|
||||||
|
|
||||||
from google.cloud import bigquery
|
|
||||||
from google.cloud.bigquery.table import RowIterator, TableListItem
|
|
||||||
from google.cloud.bigquery.dataset import DatasetListItem
|
|
||||||
from google.cloud.bigquery.table import StreamingBuffer
|
|
||||||
|
|
||||||
from google.oauth2 import service_account
|
|
||||||
|
|
||||||
from lib.custom_logger import get_logger
|
|
||||||
logger = get_logger()
|
|
||||||
|
|
||||||
class BigQueryFieldType(str,Enum):
|
|
||||||
STRING = "STRING"
|
|
||||||
BYTES = "BYTES"
|
|
||||||
INTEGER = "INTEGER"
|
|
||||||
FLOAT = "FLOAT"
|
|
||||||
BOOLEAN = "BOOLEAN"
|
|
||||||
TIMESTAMP = "TIMESTAMP"
|
|
||||||
DATE = "DATE"
|
|
||||||
TIME = "TIME"
|
|
||||||
DATETIME = "DATETIME"
|
|
||||||
RECORD = "RECORD"
|
|
||||||
NUMERIC = "NUMERIC"
|
|
||||||
BIGNUMERIC = "BIGNUMERIC"
|
|
||||||
GEOGRAPHY = "GEOGRAPHY"
|
|
||||||
JSON = "JSON"
|
|
||||||
|
|
||||||
class BogQueryFieldMode(str,Enum):
|
|
||||||
NULLABLE = "NULLABLE" # NULL許可
|
|
||||||
REQUIRED = "REQUIRED" # 必須
|
|
||||||
REPEATED = "REPEATED" # 配列
|
|
||||||
|
|
||||||
|
|
||||||
class GoogleCloudBigQueryProvider:
|
|
||||||
"""Cloud BigQuery操作プロバイダ
|
|
||||||
|
|
||||||
Notes:
|
|
||||||
- このクラスはGoogle Cloud BigQueryの基本的な操作を提供します。
|
|
||||||
- 認証にはサービスアカウントキーまたはApplication Default Credentials(ADC)を使用します。
|
|
||||||
- BigQuery Storage Read APIを使用するなら別のライブラリを使用してください(高速読み込み)
|
|
||||||
- 必要な権限
|
|
||||||
- データセット操作を行う場合はBigQuery データオーナーの権限が必要です。
|
|
||||||
- bigquery.dataViewer / bigquery.dataEditor / bigquery.readSessionUserなど部分でも可能
|
|
||||||
- JOB操作を行う場合はBigQuery ジョブユーザーの権限が必要です。
|
|
||||||
- フル機能を使う場合はBigQuery データオーナー及びBigQuery ジョブユーザーの権限が必要です。
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None):
|
|
||||||
try:
|
|
||||||
if cred_path:
|
|
||||||
creds = service_account.Credentials.from_service_account_file(cred_path)
|
|
||||||
# プロジェクト未指定なら credentials から取得
|
|
||||||
effective_project = project or creds.project_id
|
|
||||||
self._client = bigquery.Client(
|
|
||||||
project=effective_project, credentials=creds
|
|
||||||
)
|
|
||||||
logger.info(f"Bigquery client initialized with service account file. project={effective_project}")
|
|
||||||
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"):
|
|
||||||
cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
|
|
||||||
creds = service_account.Credentials.from_service_account_info(cred_json)
|
|
||||||
effective_project = project or creds.project_id
|
|
||||||
self._client = bigquery.Client(
|
|
||||||
project=effective_project, credentials=creds
|
|
||||||
)
|
|
||||||
logger.info("Bigquery client initialized with credentials from environment variable.")
|
|
||||||
else:
|
|
||||||
self._client = bigquery.Client(project=project)
|
|
||||||
logger.info("Bigquery client initialized with default credentials (ADC).")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Bigquery initialization failed: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def get_datasets(self)-> list[DatasetListItem]:
|
|
||||||
"""データセット一覧を取得する"""
|
|
||||||
try:
|
|
||||||
datasets = list(self._client.list_datasets())
|
|
||||||
return datasets
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to list datasets: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def create_dataset(
|
|
||||||
self, dataset_id: str,
|
|
||||||
location: Optional[str] ="asia-northeast1") -> bigquery.Dataset:
|
|
||||||
"""
|
|
||||||
データセットを作成する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
dataset_id (str): 作成するデータセットID
|
|
||||||
location (Optional[str]): データセットのロケーション
|
|
||||||
Returns:
|
|
||||||
bigquery.Dataset: 作成されたデータセットオブジェクト
|
|
||||||
Notes:
|
|
||||||
- 既に存在するデータセットIDを指定した場合、例外が発生します。
|
|
||||||
- 必要な権限
|
|
||||||
- bigquery.datasets.create
|
|
||||||
- 命名規則
|
|
||||||
- 英小文字 (a-z)、数字 (0-9)、およびアンダースコア (_) のみ使用可能
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
dataset_ref = self._client.dataset(dataset_id)
|
|
||||||
dataset = bigquery.Dataset(dataset_ref)
|
|
||||||
if location:
|
|
||||||
dataset.location = location
|
|
||||||
dataset = self._client.create_dataset(dataset)
|
|
||||||
logger.info(f"Dataset {dataset_id} created.")
|
|
||||||
return dataset
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to create dataset {dataset_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def is_exists_dataset(self, dataset_id: str) -> bool:
|
|
||||||
"""データセットが存在するか確認する"""
|
|
||||||
try:
|
|
||||||
self._client.get_dataset(dataset_id)
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def delete_dataset(self, dataset_id: str, delete_contents: bool = False):
|
|
||||||
"""
|
|
||||||
データセットを削除する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
dataset_id (str): 削除するデータセットID
|
|
||||||
delete_contents (bool): データセット内のテーブルも削除する場合はTrue
|
|
||||||
Notes:
|
|
||||||
- 必要な権限
|
|
||||||
- bigquery.datasets.delete
|
|
||||||
- delete_contents=Falseの場合、データセット内にテーブルが存在すると削除に失敗します。
|
|
||||||
- その場合はdelete_contents=Trueを指定してください。
|
|
||||||
- `google.api_core.exceptions.Conflict`が発生します。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
self._client.delete_dataset(dataset_id, delete_contents=delete_contents)
|
|
||||||
logger.info(f"Dataset {dataset_id} deleted.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to delete dataset {dataset_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def full_table_id(self, dataset_id: str, table_name: str) -> str:
|
|
||||||
"""テーブルIDを構築する"""
|
|
||||||
project = self._client.project
|
|
||||||
return f"{project}.{dataset_id}.{table_name}"
|
|
||||||
|
|
||||||
def is_exists_table(self,table_id: str) -> bool:
|
|
||||||
"""テーブルが存在するか確認する"""
|
|
||||||
try:
|
|
||||||
self._client.get_table(table_id)
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def addSchemaField(
|
|
||||||
name:str,
|
|
||||||
field_type:Union[str, BigQueryFieldType],
|
|
||||||
field_mode:Union[str, BogQueryFieldMode]=BogQueryFieldMode.NULLABLE ,
|
|
||||||
default_value_expression:Optional[str]=None,
|
|
||||||
description:Optional[str]=None
|
|
||||||
)-> bigquery.SchemaField:
|
|
||||||
if isinstance(field_type, BigQueryFieldType):
|
|
||||||
field_type = field_type.value
|
|
||||||
elif isinstance(field_type, str):
|
|
||||||
field_type = field_type.upper()
|
|
||||||
|
|
||||||
if isinstance(field_mode, BogQueryFieldMode):
|
|
||||||
field_mode = field_mode.value
|
|
||||||
elif isinstance(field_mode, str):
|
|
||||||
field_mode = field_mode.upper()
|
|
||||||
|
|
||||||
return bigquery.SchemaField(
|
|
||||||
name=name,
|
|
||||||
field_type=field_type,
|
|
||||||
mode=field_mode ,
|
|
||||||
default_value_expression=default_value_expression,
|
|
||||||
description=description)
|
|
||||||
|
|
||||||
def create_table(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
schema: list[bigquery.SchemaField]) -> bigquery.Table:
|
|
||||||
"""
|
|
||||||
テーブルを作成する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 作成するテーブルID(例: 'dataset.table')
|
|
||||||
schema (list[bigquery.SchemaField]): テーブルのスキーマ定義
|
|
||||||
Returns:
|
|
||||||
bigquery.Table: 作成されたテーブルオブジェクト
|
|
||||||
Notes:
|
|
||||||
- 既に存在するテーブルIDを指定した場合、例外が発生します。
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
table = bigquery.Table(table_id, schema=schema)
|
|
||||||
table = self._client.create_table(table)
|
|
||||||
logger.info(f"Table {table_id} created.")
|
|
||||||
return table
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to create table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def delete_table(self, table_id: str, not_found_ok: bool = False):
|
|
||||||
"""
|
|
||||||
テーブルを削除する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 削除するテーブルID(例: 'dataset.table')
|
|
||||||
not_found_ok (bool): テーブルが存在しない場合でもエラーにしない場合はTrue
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
self._client.delete_table(table_id, not_found_ok=not_found_ok)
|
|
||||||
logger.info(f"Table {table_id} deleted.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to delete table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def get_tables(self, dataset_id: str) -> list[TableListItem]:
|
|
||||||
"""
|
|
||||||
データセット内のテーブル一覧を取得する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
dataset_id (str): 対象のデータセットID
|
|
||||||
Returns:
|
|
||||||
list[bigquery.TableListItem]: テーブル一覧
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
tables = list(self._client.list_tables(dataset_id))
|
|
||||||
logger.info(f"Tables listed successfully from dataset {dataset_id}, count {len(tables)}.")
|
|
||||||
return tables
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to list tables in dataset {dataset_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def insert_rows(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
rows: list[Dict[str, any]]) -> Sequence[dict]:
|
|
||||||
"""
|
|
||||||
テーブルに行を挿入する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 挿入先のテーブルID(例: 'dataset.table')
|
|
||||||
rows (list[Dict[str, any]]): 挿入する行データのリスト
|
|
||||||
Returns:
|
|
||||||
Sequence[dict]: 挿入結果のレスポンスオブジェクト
|
|
||||||
Notes:
|
|
||||||
- rowsは辞書形式で指定します。キーがカラム名、値がカラムの値となります。
|
|
||||||
- 例: [{"column1": "value1", "column2": 123}, {"column1": "value2", "column2": 456}]
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
errors = self._client.insert_rows_json(table_id, rows)
|
|
||||||
if errors:
|
|
||||||
logger.error(f"Errors occurred while inserting rows into {table_id}: {errors}")
|
|
||||||
else:
|
|
||||||
logger.info(f"Rows inserted successfully into {table_id}.")
|
|
||||||
return errors
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to insert rows into table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def list_rows(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
selected_fields:Optional[list[str]] = None,
|
|
||||||
max_results:Optional[int] = None,
|
|
||||||
start_index:Optional[int] = None
|
|
||||||
) -> RowIterator:
|
|
||||||
"""
|
|
||||||
テーブルの行を一覧取得する
|
|
||||||
Args:
|
|
||||||
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
|
||||||
selected_fields (Optional[list[str]]): 取得するカラム名のリスト。Noneの場合は全カラム取得
|
|
||||||
max_results (Optional[int]): 取得する行数の上限。Noneの場合は制限なし
|
|
||||||
start_index (Optional[int]): 取得開始インデックス。Noneの場合は先頭から取得
|
|
||||||
Returns:
|
|
||||||
RowIterator: 取得した行のイテレータ
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
table = self._client.get_table(table_id)
|
|
||||||
_selected_fields:Optional[list[bigquery.SchemaField]] = None
|
|
||||||
if selected_fields:
|
|
||||||
_selected_fields = [field for field in table.schema if field.name in selected_fields]
|
|
||||||
|
|
||||||
rows = self._client.list_rows(
|
|
||||||
table,
|
|
||||||
selected_fields=_selected_fields,
|
|
||||||
max_results=max_results,
|
|
||||||
start_index=start_index
|
|
||||||
)
|
|
||||||
logger.info(f"Rows listed successfully from table {table_id}.")
|
|
||||||
return rows
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to list rows from table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def list_rows_to_dataframe(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
selected_fields:Optional[list[str]] = None,
|
|
||||||
max_results:Optional[int] = None,
|
|
||||||
start_index:Optional[int] = None
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
table = self._client.get_table(table_id)
|
|
||||||
df = self._client.list_rows(
|
|
||||||
table,
|
|
||||||
selected_fields=selected_fields,
|
|
||||||
max_results=max_results,
|
|
||||||
start_index=start_index
|
|
||||||
).to_dataframe()
|
|
||||||
logger.info(f"Rows listed to DataFrame successfully from table {table_id}.")
|
|
||||||
return df
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to list rows to DataFrame from table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
# --------------------------------------------------------
|
|
||||||
def get_streaming_buffer_info(self,table_id: str):
|
|
||||||
"""スリーミングバッファを取得する
|
|
||||||
|
|
||||||
Notes:
|
|
||||||
- スリーミングバッファは、ストリーミング挿入によってテーブルに追加されたデータが永続的なストレージに書き込まれる前に一時的に保存される場所です。
|
|
||||||
- スリーミングバッファの情報が存在しない場合、Noneを返します
|
|
||||||
- ストリーミングバッファ内のデータを直接削除・更新・クリアすることは不可能です。
|
|
||||||
- GCP側で完全に自動制御されており、手動でクリア/フラッシュ/削除する手段は 存在しません。
|
|
||||||
- 通常:数秒〜数分 高頻度ストリーミング:〜30分 極端な負荷/多パーティション:最大約90分(GCP上限目安)
|
|
||||||
"""
|
|
||||||
t = self._client.get_table(table_id)
|
|
||||||
# SDK属性 or 下位プロパティのどちらかに入っています
|
|
||||||
buf:StreamingBuffer = getattr(t, "streaming_buffer", None) or t._properties.get("streamingBuffer")
|
|
||||||
if not buf:
|
|
||||||
return None
|
|
||||||
# 例: {'estimatedRows': '123', 'estimatedBytes': '456789', 'oldestEntryTime': '1698234000000'}
|
|
||||||
logger.debug(f"Streaming Buffer Raw Info: {buf}")
|
|
||||||
return {
|
|
||||||
"estimated_rows": buf.estimated_rows,
|
|
||||||
"estimated_bytes": buf.estimated_bytes,
|
|
||||||
"oldest_entry_ms": buf.oldest_entry_time,
|
|
||||||
"raw": buf,
|
|
||||||
}
|
|
||||||
|
|
||||||
def excute_query(
|
|
||||||
self,
|
|
||||||
query: str,
|
|
||||||
params:list = None) -> bigquery.QueryJob:
|
|
||||||
"""
|
|
||||||
クエリを実行する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
query (str): 実行するSQLクエリ
|
|
||||||
Returns:
|
|
||||||
bigquery.QueryJob: 実行結果のQueryJobオブジェクト
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if params and len(params) >0:
|
|
||||||
query_job = self._client.query(query, job_config=bigquery.QueryJobConfig(
|
|
||||||
query_parameters=params
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
query_job = self._client.query(query)
|
|
||||||
logger.info("Query executed successfully.")
|
|
||||||
logger.debug(f"Query JOB: {query_job}")
|
|
||||||
return query_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute query: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def addQueryParameter(
|
|
||||||
name:str,
|
|
||||||
value: any,
|
|
||||||
param_type:Union[str, BigQueryFieldType]=BigQueryFieldType.STRING
|
|
||||||
) -> bigquery.ScalarQueryParameter:
|
|
||||||
"""クエリパラメータを作成する"""
|
|
||||||
if isinstance(param_type, BigQueryFieldType):
|
|
||||||
param_type = param_type.value
|
|
||||||
elif isinstance(param_type, str):
|
|
||||||
param_type = param_type.upper()
|
|
||||||
|
|
||||||
return bigquery.ScalarQueryParameter(
|
|
||||||
name=name,
|
|
||||||
type_=param_type,
|
|
||||||
value=value
|
|
||||||
)
|
|
||||||
|
|
||||||
def select_query(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
columns: list[str] = None,
|
|
||||||
were_clause: str = None,
|
|
||||||
params: list = None,
|
|
||||||
order_by: str = None,
|
|
||||||
limit: int = None
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
指定したテーブルからデータを選択するクエリを実行する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
|
||||||
columns (list[str]): 取得するカラム名のリスト。Noneの場合は全カラム取得
|
|
||||||
were_clause (str): WHERE句の条件式。Noneの場合は条件なし
|
|
||||||
order_by (str): 並び替えのカラム名。Noneの場合は並び替えなし
|
|
||||||
limit (int): 取得する行数の上限。Noneの場合は制限なし
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
cols = ", ".join(columns) if columns else "*"
|
|
||||||
query = f"SELECT {cols} FROM `{table_id}`"
|
|
||||||
if were_clause:
|
|
||||||
query += f" WHERE {were_clause}"
|
|
||||||
if order_by:
|
|
||||||
query += f" ORDER BY {order_by}"
|
|
||||||
if limit:
|
|
||||||
query += f" LIMIT {limit}"
|
|
||||||
query_job = self.excute_query(query, params=params)
|
|
||||||
logger.info("Select query executed successfully.")
|
|
||||||
|
|
||||||
return query_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute select query on table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def select_query_to_dataframe(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
columns: list[str] = None,
|
|
||||||
were_clause: str = None,
|
|
||||||
params: list = None,
|
|
||||||
order_by: str = None,
|
|
||||||
limit: int = None
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
query_job= self.select_query(
|
|
||||||
table_id=table_id,
|
|
||||||
columns=columns,
|
|
||||||
were_clause=were_clause,
|
|
||||||
params=params,
|
|
||||||
order_by=order_by,
|
|
||||||
limit=limit
|
|
||||||
)
|
|
||||||
df = query_job.to_dataframe()
|
|
||||||
logger.info("Select query to DataFrame executed successfully.")
|
|
||||||
return df
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to convert query result to DataFrame for table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def select_query_to_dict(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
columns: list[str] = None,
|
|
||||||
were_clause: str = None,
|
|
||||||
params: list = None,
|
|
||||||
order_by: str = None,
|
|
||||||
limit: int = None
|
|
||||||
) -> list[Dict[str, any]]:
|
|
||||||
try:
|
|
||||||
query_job= self.select_query(
|
|
||||||
table_id=table_id,
|
|
||||||
columns=columns,
|
|
||||||
were_clause=were_clause,
|
|
||||||
params=params,
|
|
||||||
order_by=order_by,
|
|
||||||
limit=limit
|
|
||||||
)
|
|
||||||
results = query_job.result()
|
|
||||||
dicts = [dict(row) for row in results]
|
|
||||||
logger.info("Select query to dicts executed successfully.")
|
|
||||||
return dicts
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to convert query result to dicts for table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def isert_query(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
values: dict[str, str],
|
|
||||||
params: list = None
|
|
||||||
) -> bigquery.QueryJob:
|
|
||||||
"""
|
|
||||||
挿入クエリ(INSERT)を実行する
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
query = f"INSERT INTO `{table_id}` ({', '.join(values.keys())})"
|
|
||||||
query += f" VALUES ({', '.join([str(v) for v in values.values()])})"
|
|
||||||
query_job = self.excute_query(query, params=params)
|
|
||||||
logger.info("Insert query executed successfully.")
|
|
||||||
return query_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute insert query on table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def update_query(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
values: dict[str, str],
|
|
||||||
were_clause: str,
|
|
||||||
params: list = None
|
|
||||||
) -> bigquery.QueryJob:
|
|
||||||
"""
|
|
||||||
更新クエリ(UPDATE)を実行する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 対象のテーブルID(例: 'dataset.table')
|
|
||||||
values (dict[str, str]): 更新するカラムと値の辞書
|
|
||||||
were_clause (str): WHERE句の条件式。全件の場合は"TRUE"を指定
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
query = f"UPDATE `{table_id}`"
|
|
||||||
query += f" SET {', '.join([f'{k} = {v}' for k, v in values.items()])}"
|
|
||||||
if were_clause:
|
|
||||||
query += f" WHERE {were_clause}"
|
|
||||||
logger.debug(f"Update Query: {query}")
|
|
||||||
query_job = self.excute_query(query, params=params)
|
|
||||||
logger.info("Update query executed successfully.")
|
|
||||||
return query_job.result()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute update query on table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def delete_query(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
were_clause:str,
|
|
||||||
params: list = None
|
|
||||||
) -> bigquery.QueryJob:
|
|
||||||
"""
|
|
||||||
削除クエリ(DELETE)を実行する
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
query = f"DELETE FROM `{table_id}`"
|
|
||||||
if were_clause:
|
|
||||||
query += f" WHERE {were_clause}"
|
|
||||||
query_job = self.excute_query(query, params=params)
|
|
||||||
logger.info("Delete query executed successfully.")
|
|
||||||
return query_job.result()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute delete query on table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def upsert_query(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
insert_values: dict[str, str],
|
|
||||||
update_values: dict[str, str],
|
|
||||||
key_columns: list[str],
|
|
||||||
params: list = None
|
|
||||||
) -> bigquery.QueryJob:
|
|
||||||
"""
|
|
||||||
UPSERTクエリ(MERGE)を実行する
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
query = f"MERGE INTO `{table_id}` T"
|
|
||||||
query += f" USING (SELECT {', '.join([str(v) for v in insert_values.values()])}) S"
|
|
||||||
query += f" ON {' AND '.join([f'T.{col} = S.{col}' for col in key_columns])}"
|
|
||||||
query += " WHEN MATCHED THEN"
|
|
||||||
query += f" UPDATE SET {', '.join([f'T.{k} = {v}' for k, v in update_values.items()])}"
|
|
||||||
query += " WHEN NOT MATCHED THEN"
|
|
||||||
query += f" INSERT ({', '.join(insert_values.keys())})"
|
|
||||||
query += f" VALUES ({', '.join([str(v) for v in insert_values.values()])})"
|
|
||||||
query_job = self.excute_query(query, params=params)
|
|
||||||
logger.info("Upsert query executed successfully.")
|
|
||||||
return query_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to execute upsert query on table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def import_csv(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
csv_file_path: str,
|
|
||||||
skip_leading_rows: int = 1,
|
|
||||||
write_disposition: str = "WRITE_APPEND",
|
|
||||||
field_delimiter: str = ",",
|
|
||||||
autodetect: bool = True,
|
|
||||||
schema: list[bigquery.SchemaField] = None
|
|
||||||
) -> bigquery.LoadJob:
|
|
||||||
"""
|
|
||||||
CSVファイルからテーブルにデータを挿入する
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): 挿入先のテーブルID(例: 'dataset.table')
|
|
||||||
csv_file_path (str): 挿入元のCSVファイルパス GCSも対応可能(gs://bucket/path/to/file.csv)
|
|
||||||
skip_leading_rows (int): ヘッダ行をスキップする行数
|
|
||||||
write_disposition (str): 書き込みモード (例: "WRITE_APPEND", "WRITE_TRUNCATE", "WRITE_EMPTY")
|
|
||||||
field_delimiter (str): フィールド区切り文字
|
|
||||||
autodetect (bool): スキーマ自動検出を有効にするかどうか
|
|
||||||
schema (list[bigquery.SchemaField]): スキーマ定義。autodetect=Falseの場合に必要
|
|
||||||
Returns:
|
|
||||||
bigquery.LoadJob: ロードジョブオブジェクト
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
job_config = bigquery.LoadJobConfig(
|
|
||||||
skip_leading_rows=skip_leading_rows,
|
|
||||||
write_disposition=write_disposition,
|
|
||||||
field_delimiter=field_delimiter,
|
|
||||||
autodetect=autodetect,
|
|
||||||
schema=schema
|
|
||||||
)
|
|
||||||
with open(csv_file_path, "rb") as source_file:
|
|
||||||
load_job = self._client.load_table_from_file(
|
|
||||||
source_file,
|
|
||||||
table_id,
|
|
||||||
job_config=job_config
|
|
||||||
)
|
|
||||||
load_job.result() # ジョブの完了を待機
|
|
||||||
logger.info(f"Data loaded successfully from {csv_file_path} to table {table_id}.")
|
|
||||||
return load_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to load data from {csv_file_path} to table {table_id}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def export_csv_to_gcs(
|
|
||||||
self,
|
|
||||||
table_id: str,
|
|
||||||
bucket: str,
|
|
||||||
prefix: str,
|
|
||||||
csv_file_name: str =None,
|
|
||||||
location: str = None
|
|
||||||
) -> bigquery.ExtractJob:
|
|
||||||
"""
|
|
||||||
テーブルからGCSのCSVファイルにデータをエクスポートする
|
|
||||||
|
|
||||||
Args:
|
|
||||||
table_id (str): エクスポート元のテーブルID(例: 'dataset.table')
|
|
||||||
bucket (str): エクスポート先のGCSバケット名
|
|
||||||
prefix (str): エクスポート先のGCSプレフィックス(フォルダパス)
|
|
||||||
csv_file_name (str): エクスポート先のCSVファイル名。Noneの場合は自動生成
|
|
||||||
location (str): ジョブのロケーション
|
|
||||||
Returns:
|
|
||||||
bigquery.ExtractJob: エクストラクトジョブオブジェクト
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if not csv_file_name:
|
|
||||||
_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
csv_file_name = f"{table_id.replace('.','_')}_{_timestamp}.csv"
|
|
||||||
|
|
||||||
destination_uri = f"gs://{bucket}/{prefix}/{csv_file_name}"
|
|
||||||
|
|
||||||
extract_job = self._client.extract_table(
|
|
||||||
table_id,
|
|
||||||
destination_uri,
|
|
||||||
location=location
|
|
||||||
)
|
|
||||||
extract_job.result() # ジョブの完了を待機
|
|
||||||
logger.info(f"Data exported successfully from table {table_id} to {destination_uri}.")
|
|
||||||
return extract_job
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to export data from table {table_id} to {destination_uri}: {e}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
@ -10,9 +10,6 @@ from google.oauth2 import service_account
|
|||||||
from lib.custom_logger import get_logger
|
from lib.custom_logger import get_logger
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
import zipfile
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
class GoogleCloudStorageProvider:
|
class GoogleCloudStorageProvider:
|
||||||
|
|
||||||
|
|
||||||
@ -78,17 +75,6 @@ class GoogleCloudStorageProvider:
|
|||||||
|
|
||||||
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
|
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
|
||||||
content_type: str | None = None) -> Dict[str, Any]:
|
content_type: str | None = None) -> Dict[str, Any]:
|
||||||
"""
|
|
||||||
オブジェクトを書き込む
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bucket (str): バケット名
|
|
||||||
object_name (str): オブジェクト名
|
|
||||||
data (Union[bytes, BinaryIO, str]): 書き込むデータ
|
|
||||||
content_type (Optional[str]): コンテンツタイプ(MIMEタイプ)
|
|
||||||
Returns:
|
|
||||||
Dict[str, Any]: 書き込んだオブジェクトの情報
|
|
||||||
"""
|
|
||||||
blob = self._blob(bucket, object_name)
|
blob = self._blob(bucket, object_name)
|
||||||
if content_type is None:
|
if content_type is None:
|
||||||
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
|
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
|
||||||
@ -114,57 +100,4 @@ class GoogleCloudStorageProvider:
|
|||||||
|
|
||||||
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
|
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
|
||||||
expires: timedelta = timedelta(hours=1)) -> str:
|
expires: timedelta = timedelta(hours=1)) -> str:
|
||||||
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
|
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
|
||||||
|
|
||||||
def zip_items(
|
|
||||||
self,
|
|
||||||
bucket: str,
|
|
||||||
object_names: List[str],
|
|
||||||
) -> bytes:
|
|
||||||
"""
|
|
||||||
複数のGCSオブジェクトを1つのZIPにまとめ、ZIPバイナリ(bytes)を返す
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bucket (str): バケット名
|
|
||||||
object_names (List[str]): 対象オブジェクトのリスト
|
|
||||||
Returns:
|
|
||||||
bytes: ZIPファイルのバイナリ
|
|
||||||
"""
|
|
||||||
out = io.BytesIO()
|
|
||||||
with zipfile.ZipFile(out, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
||||||
for obj in object_names:
|
|
||||||
blob = self._blob(bucket, obj)
|
|
||||||
if not blob.exists():
|
|
||||||
raise FileNotFoundError(f"Object not found: gs://{bucket}/{obj}")
|
|
||||||
|
|
||||||
buf = io.BytesIO()
|
|
||||||
blob.download_to_file(buf)
|
|
||||||
buf.seek(0)
|
|
||||||
arcname = Path(obj).name
|
|
||||||
zf.writestr(arcname, buf.read())
|
|
||||||
|
|
||||||
zf.comment = f"bucket={bucket}, files={len(object_names)}".encode()
|
|
||||||
|
|
||||||
return out.getvalue()
|
|
||||||
|
|
||||||
def upload_folder(self, bucket: str, folder_path: str, gcs_prefix: str = ""):
|
|
||||||
"""
|
|
||||||
ローカルフォルダをGCSに再帰的にアップロードする
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bucket (str): バケット名
|
|
||||||
folder_path (str): ローカルフォルダのパス
|
|
||||||
gcs_prefix (str): GCS上のプレフィックス(フォルダパス)
|
|
||||||
"""
|
|
||||||
_bucket = self._bucket(bucket)
|
|
||||||
|
|
||||||
for root, _, files in os.walk(folder_path):
|
|
||||||
for file in files:
|
|
||||||
local_file_path = os.path.join(root, file)
|
|
||||||
# フォルダ構造を保つように相対パスを生成
|
|
||||||
relative_path = os.path.relpath(local_file_path, folder_path)
|
|
||||||
gcs_object_name = os.path.join(gcs_prefix, relative_path).replace("\\", "/")
|
|
||||||
|
|
||||||
blob = _bucket.blob(gcs_object_name)
|
|
||||||
blob.upload_from_filename(local_file_path)
|
|
||||||
logger.info(f"Uploaded {local_file_path} to gs://{bucket}/{gcs_object_name}")
|
|
||||||
@ -31,8 +31,8 @@ class OneDriveProvider:
|
|||||||
client_id: Azure Portal のアプリ(公開クライアント)の Client ID
|
client_id: Azure Portal のアプリ(公開クライアント)の Client ID
|
||||||
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
|
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
|
||||||
scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"]
|
scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"]
|
||||||
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意)
|
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意)
|
||||||
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意)
|
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意)
|
||||||
"""
|
"""
|
||||||
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
|
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
|
||||||
self.authority = authority or self.DEFAULT_AUTHORITY
|
self.authority = authority or self.DEFAULT_AUTHORITY
|
||||||
@ -64,7 +64,7 @@ class OneDriveProvider:
|
|||||||
f.write(self._token_cache.serialize())
|
f.write(self._token_cache.serialize())
|
||||||
|
|
||||||
def ensure_token(self):
|
def ensure_token(self):
|
||||||
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
|
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
|
||||||
if self._access_token:
|
if self._access_token:
|
||||||
return self._access_token
|
return self._access_token
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ class OneDriveProvider:
|
|||||||
"""
|
"""
|
||||||
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。
|
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。
|
||||||
先頭に / を付けず、URL エンコードは requests 側に任せる前提で
|
先頭に / を付けず、URL エンコードは requests 側に任せる前提で
|
||||||
空白や日本語は安全のため quote することを推奨(今回は簡易化)。
|
空白や日本語は安全のため quote することを推奨(今回は簡易化)。
|
||||||
"""
|
"""
|
||||||
if not path or path.strip() in ["/", "."]:
|
if not path or path.strip() in ["/", "."]:
|
||||||
return ""
|
return ""
|
||||||
@ -156,7 +156,7 @@ class OneDriveProvider:
|
|||||||
|
|
||||||
def create_folder(self, folder_path: str) -> Dict[str, Any]:
|
def create_folder(self, folder_path: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
中間フォルダも順次作成(簡易実装)。
|
中間フォルダも順次作成(簡易実装)。
|
||||||
"""
|
"""
|
||||||
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
|
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
|
||||||
cur = ""
|
cur = ""
|
||||||
@ -300,7 +300,7 @@ class OneDriveProvider:
|
|||||||
return r.json()
|
return r.json()
|
||||||
|
|
||||||
# -----------------------
|
# -----------------------
|
||||||
# 共有リンク(擬似 Signed URL)
|
# 共有リンク(擬似 Signed URL)
|
||||||
# -----------------------
|
# -----------------------
|
||||||
def generate_share_link(
|
def generate_share_link(
|
||||||
self,
|
self,
|
||||||
@ -316,7 +316,7 @@ class OneDriveProvider:
|
|||||||
url = f"{self._item_by_path_url(path)}:/createLink"
|
url = f"{self._item_by_path_url(path)}:/createLink"
|
||||||
body: Dict[str, Any] = {"type": link_type, "scope": scope}
|
body: Dict[str, Any] = {"type": link_type, "scope": scope}
|
||||||
if password:
|
if password:
|
||||||
body["password"] = password # パスワード保護(ポリシーにより可否)
|
body["password"] = password # パスワード保護(ポリシーにより可否)
|
||||||
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
|
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
data:dict = r.json()
|
data:dict = r.json()
|
||||||
|
|||||||
@ -1,41 +0,0 @@
|
|||||||
import feedparser
|
|
||||||
from feedparser import FeedParserDict
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
class Feed(BaseModel):
|
|
||||||
url: str
|
|
||||||
title: str | None = None
|
|
||||||
company: str | None = None
|
|
||||||
description: str | None = None
|
|
||||||
language: str | None = None
|
|
||||||
tags: list[str] | None = None
|
|
||||||
|
|
||||||
|
|
||||||
class RSSItem(BaseModel):
|
|
||||||
title: str
|
|
||||||
link: str
|
|
||||||
guid: str | None = None
|
|
||||||
published: str | None = None
|
|
||||||
ts: str | None = None
|
|
||||||
source: str | None = None
|
|
||||||
description: str | None = None
|
|
||||||
|
|
||||||
|
|
||||||
class RSSReaderClient:
|
|
||||||
"""RSSリーダークライアント"""
|
|
||||||
@classmethod
|
|
||||||
def fetch(cls, feed: Feed, from_date: str, to_date: str) -> list[RSSItem]:
|
|
||||||
"""指定されたフィードから記事を取得する"""
|
|
||||||
items = []
|
|
||||||
d: FeedParserDict = feedparser.parse(feed.url)
|
|
||||||
for e in d.entries:
|
|
||||||
items.append(RSSItem(
|
|
||||||
title=e.get("title", "(no title)"),
|
|
||||||
link=e.get("link"),
|
|
||||||
guid=e.get("id") or e.get("guid") or e.get("link"),
|
|
||||||
published=e.get("published") or e.get("updated"),
|
|
||||||
ts=e.get("published") or e.get("updated"),
|
|
||||||
source=feed.url,
|
|
||||||
description=e.get("summary") or e.get("description"),
|
|
||||||
))
|
|
||||||
return items
|
|
||||||
Loading…
x
Reference in New Issue
Block a user