diff --git a/src/providers/google_cloud_storage_provider.py b/src/providers/google_cloud_storage_provider.py new file mode 100644 index 0000000..a9461a4 --- /dev/null +++ b/src/providers/google_cloud_storage_provider.py @@ -0,0 +1,100 @@ +import os +import io +from typing import Optional, List, Dict, Any, Union, BinaryIO, +from datetime import timedelta +import mimetypes + +from google.cloud import storage + +from google.oauth2 import service_account + +from lib.custom_logger import get_logger +logger = get_logger() + +class GoogleCloudStorageProvider: + + + def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None): + try: + if cred_path: + creds = service_account.Credentials.from_service_account_file(cred_path) + # プロジェクト未指定なら credentials から取得 + effective_project = project or creds.project_id + self._client = storage.Client( + project=effective_project, credentials=creds + ) + logger.info(f"GCS client initialized with service account file. project={effective_project}") + elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"): + cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") + creds = service_account.Credentials.from_service_account_info(cred_json) + effective_project = project or creds.project_id + self._client = storage.Client( + project=effective_project, credentials=creds + ) + logger.info("GCS client initialized with credentials from environment variable.") + else: + self._client = storage.Client(project=project) + logger.info("GCS client initialized with default credentials (ADC).") + except Exception as e: + logger.error(f"GCS initialization failed: {e}") + raise + + # Private methods to get bucket and blob references + def _bucket(self, bucket: str) -> storage.Bucket: + return self._client.bucket(bucket) + + def _blob(self, bucket: str, object_name: str) -> storage.Blob: + return self._bucket(bucket).blob(object_name) + + + # バケット操作 + def get_buckets(self) -> List[str]: + buckets: List[storage.Bucket] = self._client.list_buckets() + return [b.name for b in buckets] + + def create_bucket(self, bucket_name: str, location: str = "ASIA-NORTHEAST1", storage_class: str = "STANDARD"): + b = storage.Bucket(self._client, name=bucket_name) + b.storage_class = storage_class + return self._client.create_bucket(b, location=location) + + def is_exists_bucket(self, bucket_name: str) -> bool: + try: + self._client.get_bucket(bucket_name) + return True + except Exception: + return False + + # オブジェクト操作 + def get_items(self, bucket: str, prefix: str | None = None) -> List[Dict[str, Any]]: + items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix) + return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type} + for bl in items] + + def is_exists_item(self, bucket: str, object_name: str) -> bool: + return self._blob(bucket, object_name).exists() + + + def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str], + content_type: str | None = None) -> Dict[str, Any]: + blob = self._blob(bucket, object_name) + if content_type is None: + content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream" + blob.content_type = content_type + + if isinstance(data, (bytes, bytearray)): + blob.upload_from_file(io.BytesIO(data), content_type=content_type, rewind=True) + elif hasattr(data, "read"): + blob.upload_from_file(data, content_type=content_type, rewind=True) + elif isinstance(data, str) and os.path.exists(data): + blob.upload_from_filename(data, content_type=content_type) + else: + raise ValueError("data must be bytes, file-like, or existing filepath") + return {"name": blob.name, "size": blob.size, "content_type": blob.content_type} + + def read_item(self, bucket: str, object_name: str, as_text: bool = False, encoding: str = "utf-8"): + data = self._blob(bucket, object_name).download_as_bytes() + return data.decode(encoding) if as_text else data + + def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET", + expires: timedelta = timedelta(hours=1)) -> str: + return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method) \ No newline at end of file