import os from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union from lib.custom_logger import get_logger import firebase_admin from firebase_admin import credentials,firestore import google.cloud.firestore # from google.cloud.firestore import Client logger = get_logger() Filter = Tuple[str, str, Any] # (field, op, value) class FireStoreProvider(): """Firestoreプロバイダクラス""" _db: Optional[google.cloud.firestore.Client] = None _ALLOWED_OPS = { "==", "!=", "<", "<=", ">", ">=", "in", "not-in", "array_contains", "array_contains_any" } def __init__(self, cred_path: str = None): """ Firestoreプロバイダの初期化 Args: cred_path (str): サービスアカウントキーのパス(Noneの場合はデフォルト認証を使用) """ if not firebase_admin._apps: try: logger.debug("Initializing Firestore") if cred_path: cred = credentials.Certificate(cred_path) firebase_admin.initialize_app(cred) logger.info("Firestore initialized with service account key") elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"): cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") cred = credentials.Certificate(cred_json) firebase_admin.initialize_app(cred) logger.info("Firestore initialized with credentials from environment variable") else: # 環境変数にGOOGLE_APPLICATION_CREDENTIALSが設定されている場合は自動的に認証される # サービスアカウントのJSONキーのファイルパス # Cloud RunやGCE/GKEなどのGCP環境では、環境変数を設定しなくても デフォルトサービスアカウントで認証されます。 firebase_admin.initialize_app() logger.info("Firestore initialized with default credentials") except Exception as e: logger.error(f"Firestore initialization failed: {e}") raise if FireStoreProvider._db is None: FireStoreProvider._db = firestore.client() logger.info("Firestore client created") # ============= base accessors ============= @classmethod def get_db(cls) -> google.cloud.firestore.Client: if cls._db is None: raise RuntimeError("Firestore client not initialized. Construct FireStoreProvider first.") return cls._db @classmethod def get_collection_ref(cls, collection_name: str) -> google.cloud.firestore.CollectionReference: return cls.get_db().collection(collection_name) @classmethod def get_doc_ref(cls, collection_name: str, doc_id: str) -> google.cloud.firestore.DocumentReference: return cls.get_collection_ref(collection_name).document(doc_id) # ============= CRUD / Query ============= @classmethod def list_documents( cls, collection_name: str, filters: Optional[Sequence[Filter]] = None, order_by: Optional[Union[str, Sequence[str]]] = None, limit: Optional[int] = None, start_after: Optional[Union[google.cloud.firestore.DocumentSnapshot, Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """ コレクションのドキュメントを取得(フィルタ/並び替え/制限/ページング対応) Args: collection_name: コレクション名 filters: [(field, op, value), ...] order_by: 並び順のフィールド名 or その配列('-created_at' のように先頭'-'で降順) limit: 取得件数の上限 start_after: ドキュメントスナップショット、または order_by で並べた最後の値の辞書 Returns: List[dict]: それぞれに 'id' を含む dict の配列 """ col = cls.get_collection_ref(collection_name) q: Union[google.cloud.firestore.Query, google.cloud.firestore.CollectionReference] = col # filters if filters: for field, op, value in filters: if op not in cls._ALLOWED_OPS: raise ValueError(f"Unsupported operator: {op}") q = q.where(field, op, value) # order_by if order_by: if isinstance(order_by, str): order_by = [order_by] for key in order_by: if key.startswith("-"): q = q.order_by(key[1:], direction=google.cloud.firestore.Query.DESCENDING) else: q = q.order_by(key, direction=google.cloud.firestore.Query.ASCENDING) # limit if limit: q = q.limit(limit) # pagination if start_after is not None: if isinstance(start_after, google.cloud.firestore.DocumentSnapshot): q = q.start_after(start_after) elif isinstance(start_after, dict): # order_by が指定されている前提で、そのキー順に値を渡す if not order_by: raise ValueError("start_after as dict requires order_by to be set.") values = [] for key in order_by: field_name = key[1:] if key.startswith("-") else key if field_name not in start_after: raise ValueError(f"start_after dict missing field: {field_name}") values.append(start_after[field_name]) q = q.start_after(values) else: raise ValueError("start_after must be DocumentSnapshot or dict") docs: List[google.cloud.firestore.DocumentSnapshot] = q.stream() results: List[Dict[str, Any]] = [] for d in docs: data = d.to_dict() or {} data["id"] = d.id results.append(data) return results @classmethod def get_document(cls, collection_name: str, doc_id: str) -> Optional[Dict[str, Any]]: """ ドキュメントをIDで取得。存在しなければ None を返す。 """ ref = cls.get_doc_ref(collection_name, doc_id) snap = ref.get() if not snap.exists: return None data = snap.to_dict() or {} data["id"] = snap.id return data @classmethod def create_document( cls, collection_name: str, data: Dict[str, Any], doc_id: Optional[str] = None, merge: bool = False, ) -> str: """ ドキュメントを作成。doc_id を渡さない場合は自動採番。 Returns: str: 作成(または上書き)されたドキュメント """ col = cls.get_collection_ref(collection_name) if doc_id: ref = col.document(doc_id) ref.set(data, merge=merge) logger.info(f"Created/Set document: {collection_name}/{doc_id}") return doc_id else: ref = col.add(data)[1] # add -> (update_time, ref) logger.info(f"Created document: {collection_name}/{ref.id}") return ref.id @classmethod def update_document( cls, collection_name: str, doc_id: str, data: Dict[str, Any], merge: bool = True, ) -> None: """ ドキュメントを更新。merge=True なら部分更新(推奨)。 """ ref = cls.get_doc_ref(collection_name, doc_id) if merge: ref.set(data, merge=True) else: # 全置換(存在しないと作成される) ref.set(data, merge=False) logger.info(f"Updated document: {collection_name}/{doc_id} (merge={merge})") @classmethod def delete_document(cls, collection_name: str, doc_id: str) -> None: """ドキュメントを削除""" ref = cls.get_doc_ref(collection_name, doc_id) ref.delete() logger.info(f"Deleted document: {collection_name}/{doc_id}")