クラウドストレージ用プロバイダ

This commit is contained in:
ry.yamafuji 2025-08-30 15:01:29 +09:00
parent 0055c66731
commit 84c2b9f2e7

View File

@ -0,0 +1,100 @@
import os
import io
from typing import Optional, List, Dict, Any, Union, BinaryIO,
from datetime import timedelta
import mimetypes
from google.cloud import storage
from google.oauth2 import service_account
from lib.custom_logger import get_logger
logger = get_logger()
class GoogleCloudStorageProvider:
def __init__(self, cred_path: Optional[str] = None, project: Optional[str] = None):
try:
if cred_path:
creds = service_account.Credentials.from_service_account_file(cred_path)
# プロジェクト未指定なら credentials から取得
effective_project = project or creds.project_id
self._client = storage.Client(
project=effective_project, credentials=creds
)
logger.info(f"GCS client initialized with service account file. project={effective_project}")
elif os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON"):
cred_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
creds = service_account.Credentials.from_service_account_info(cred_json)
effective_project = project or creds.project_id
self._client = storage.Client(
project=effective_project, credentials=creds
)
logger.info("GCS client initialized with credentials from environment variable.")
else:
self._client = storage.Client(project=project)
logger.info("GCS client initialized with default credentials (ADC).")
except Exception as e:
logger.error(f"GCS initialization failed: {e}")
raise
# Private methods to get bucket and blob references
def _bucket(self, bucket: str) -> storage.Bucket:
return self._client.bucket(bucket)
def _blob(self, bucket: str, object_name: str) -> storage.Blob:
return self._bucket(bucket).blob(object_name)
# バケット操作
def get_buckets(self) -> List[str]:
buckets: List[storage.Bucket] = self._client.list_buckets()
return [b.name for b in buckets]
def create_bucket(self, bucket_name: str, location: str = "ASIA-NORTHEAST1", storage_class: str = "STANDARD"):
b = storage.Bucket(self._client, name=bucket_name)
b.storage_class = storage_class
return self._client.create_bucket(b, location=location)
def is_exists_bucket(self, bucket_name: str) -> bool:
try:
self._client.get_bucket(bucket_name)
return True
except Exception:
return False
# オブジェクト操作
def get_items(self, bucket: str, prefix: str | None = None) -> List[Dict[str, Any]]:
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix)
return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type}
for bl in items]
def is_exists_item(self, bucket: str, object_name: str) -> bool:
return self._blob(bucket, object_name).exists()
def write_item(self, bucket: str, object_name: str, data: Union[bytes, BinaryIO, str],
content_type: str | None = None) -> Dict[str, Any]:
blob = self._blob(bucket, object_name)
if content_type is None:
content_type = mimetypes.guess_type(object_name)[0] or "application/octet-stream"
blob.content_type = content_type
if isinstance(data, (bytes, bytearray)):
blob.upload_from_file(io.BytesIO(data), content_type=content_type, rewind=True)
elif hasattr(data, "read"):
blob.upload_from_file(data, content_type=content_type, rewind=True)
elif isinstance(data, str) and os.path.exists(data):
blob.upload_from_filename(data, content_type=content_type)
else:
raise ValueError("data must be bytes, file-like, or existing filepath")
return {"name": blob.name, "size": blob.size, "content_type": blob.content_type}
def read_item(self, bucket: str, object_name: str, as_text: bool = False, encoding: str = "utf-8"):
data = self._blob(bucket, object_name).download_as_bytes()
return data.decode(encoding) if as_text else data
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
expires: timedelta = timedelta(hours=1)) -> str:
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)