From 2ffaf13ab876d1ee839b715d648135244f95234b Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Wed, 10 Sep 2025 04:45:09 +0900 Subject: [PATCH] =?UTF-8?q?OneDrive=E5=87=A6=E7=90=86=E8=BF=BD=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docments/ondrive.md | 35 ++ example/example_one_drive.py | 22 ++ .../google_cloud_storage_provider.py | 9 +- src/providers/one_drive_provider.py | 324 ++++++++++++++++++ 4 files changed, 387 insertions(+), 3 deletions(-) create mode 100644 docments/ondrive.md create mode 100644 example/example_one_drive.py create mode 100644 src/providers/one_drive_provider.py diff --git a/docments/ondrive.md b/docments/ondrive.md new file mode 100644 index 0000000..4306947 --- /dev/null +++ b/docments/ondrive.md @@ -0,0 +1,35 @@ +# OneDraiveに連携する方法 + +MSAL(認証) + requests(HTTP)のみです。 +これでMicrosoft GraphのOneDrive APIを直接叩けます + +## 事前準備(無料) + +* アカウント + - Microsoft アカウント(個人 OneDrive) + - Microsoft Entra ID(旧Azure AD:OneDrive for Business)。 + - どちらでも Graph 経由で同じコードが使えます。 +* アプリ登録(Azure Portal) + * https://azure.microsoft.com/ja-jp/get-started/azure-portal + * Azure Portal → App registrations → New registration + * サポートするアカウント種別は簡単のため “common” 相当(個人/組織どちらもOK)を選ぶと便利 + * リダイレクトURIは不要(後述のデバイスコードフローならクライアントIDだけでOK) +* Authentication で Mobile and desktop flows(Public client) を有効化(Device Code Flow用)。 + * App registrationsで選択する + * 左メニューのAuthentication + * Mobile and desktop flows(Public client)を有効にする +* API permissions に Microsoft Graph → Delegated で最低限 + * Files.ReadWrite(ユーザーのファイルを読み書き) + * offline_access(リフレッシュ/長期トークン) + * User.Read(基本プロフィール) + + + + +## ライブラリをインストール + +```sh +pip install msal requests +``` + + diff --git a/example/example_one_drive.py b/example/example_one_drive.py new file mode 100644 index 0000000..f4628ac --- /dev/null +++ b/example/example_one_drive.py @@ -0,0 +1,22 @@ +import sys +import os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) + +from dotenv import load_dotenv +load_dotenv(".env") + +from lib.custom_logger import get_logger +logger = get_logger(level=10) + +from providers.one_drive_provider import OneDriveProvider + +def example_onedrive(): + logger.info("Starting OneDrive example") + # case Supported account types= Personal Microsoft account users + authority = "https://login.microsoftonline.com/consumers" + token_cache_path=".onedrive_cache.json" # ★キャッシュファイル + provider = OneDriveProvider(authority=authority, token_cache_path=token_cache_path) + logger.info(f"provider {provider.client_id}") + logger.info(f"Listed items: {provider.get_items()}") + +example_onedrive() \ No newline at end of file diff --git a/src/providers/google_cloud_storage_provider.py b/src/providers/google_cloud_storage_provider.py index f9b4693..29fee49 100644 --- a/src/providers/google_cloud_storage_provider.py +++ b/src/providers/google_cloud_storage_provider.py @@ -5,7 +5,6 @@ from datetime import timedelta import mimetypes from google.cloud import storage - from google.oauth2 import service_account from lib.custom_logger import get_logger @@ -65,8 +64,8 @@ class GoogleCloudStorageProvider: return False # オブジェクト操作 - def get_items(self, bucket: str, prefix: str | None = None) -> List[Dict[str, Any]]: - items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix) + def get_items(self, bucket: str, prefix: str | None = None, match_glob:str | None=None) -> List[Dict[str, Any]]: + items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix,match_glob=match_glob) return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type} for bl in items] @@ -95,6 +94,10 @@ class GoogleCloudStorageProvider: data = self._blob(bucket, object_name).download_as_bytes() return data.decode(encoding) if as_text else data + def delete_item(self, bucket: str, object_name: str): + """オブジェクトを削除する""" + self._blob(bucket, object_name).delete() + def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET", expires: timedelta = timedelta(hours=1)) -> str: return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method) \ No newline at end of file diff --git a/src/providers/one_drive_provider.py b/src/providers/one_drive_provider.py new file mode 100644 index 0000000..7ca8bdc --- /dev/null +++ b/src/providers/one_drive_provider.py @@ -0,0 +1,324 @@ +import os +import io +import mimetypes +from typing import Optional, List, Dict, Any, Union, BinaryIO +import pathlib +import requests +import msal + + + +from lib.custom_logger import get_logger +logger = get_logger() + +class OneDriveProvider: + + GRAPH = "https://graph.microsoft.com/v1.0" + DEFAULT_SCOPES = ["Files.ReadWrite", "User.Read"] + DEFAULT_AUTHORITY = "https://login.microsoftonline.com/common" + SMALL_UPLOAD_LIMIT = 4 * 1024 * 1024 # 4MB + + def __init__( + self, + client_id: Optional[str] = None, + authority: Optional[str] = None, + scopes: Optional[List[str]] = None, + token_cache_path: Optional[str] = None, + access_token: Optional[str] = None, + ): + """ + Args: + client_id: Azure Portal のアプリ(公開クライアント)の Client ID + authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common') + scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"] + token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意) + access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意) + """ + self.client_id = client_id or os.getenv("MS_CLIENT_ID") or "" + self.authority = authority or self.DEFAULT_AUTHORITY + self.scopes = scopes or self.DEFAULT_SCOPES + self.token_cache_path = token_cache_path + self._token_cache = msal.SerializableTokenCache() if token_cache_path else None + + if self._token_cache and os.path.exists(token_cache_path): + try: + self._token_cache.deserialize(open(token_cache_path, "r", encoding="utf-8").read()) + except Exception: + logger.warning("Failed to load token cache. Continuing without cache.") + + self._app = msal.PublicClientApplication( + client_id=self.client_id, + authority=self.authority, + token_cache=self._token_cache, + ) + + self._session = requests.Session() + self._access_token = access_token # 直渡しトークンがあればそれを優先 + + # ----------------------- + # 認証・トークン管理 + # ----------------------- + def _save_cache(self): + if self._token_cache and self.token_cache_path: + with open(self.token_cache_path, "w", encoding="utf-8") as f: + f.write(self._token_cache.serialize()) + + def ensure_token(self): + """有効な Access Token を確保(キャッシュ→デバイスコード)。""" + if self._access_token: + return self._access_token + + # キャッシュからサイレント取得 + if self._token_cache: + accounts = self._app.get_accounts() + if accounts: + result = self._app.acquire_token_silent(self.scopes, account=accounts[0]) + if result and "access_token" in result: + self._access_token = result["access_token"] + return self._access_token + + # Device Code Flow + flow = self._app.initiate_device_flow(scopes=self.scopes) + if "user_code" not in flow: + raise RuntimeError("Device flow の初期化に失敗しました") + print(flow["message"]) # 表示の指示に従ってブラウザで認証 + + result = self._app.acquire_token_by_device_flow(flow) + if "access_token" not in result: + raise RuntimeError(f"トークン取得失敗: {result.get('error_description')}") + self._access_token = result["access_token"] + self._save_cache() + return self._access_token + + def _headers(self) -> Dict[str, str]: + token = self.ensure_token() + return {"Authorization": f"Bearer {token}"} + + # ----------------------- + # パスユーティリティ + # ----------------------- + @staticmethod + def _normalize_path(path: Optional[str]) -> str: + """ + Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。 + 先頭に / を付けず、URL エンコードは requests 側に任せる前提で + 空白や日本語は安全のため quote することを推奨(今回は簡易化)。 + """ + if not path or path.strip() in ["/", "."]: + return "" + # 先頭・末尾スラッシュを整理 + p = str(path).strip().strip("/") + return p + + def _item_by_path_url(self, path: str) -> str: + norm = self._normalize_path(path) + if norm: + return f"{self.GRAPH}/me/drive/root:/{norm}" + else: + return f"{self.GRAPH}/me/drive/root" + + # ----------------------- + # 一覧・存在・フォルダ + # ----------------------- + def get_items(self, prefix: Optional[str] = None) -> List[Dict[str, Any]]: + """ + ルート or 指定フォルダ直下の一覧を返す。 + Returns: [{name, id, size, folder(bool), lastModifiedDateTime, content_type?...}] + """ + base = self._item_by_path_url(prefix) + url = f"{base}:/children" if prefix else f"{base}/children" + resp = self._session.get(url, headers=self._headers()) + resp.raise_for_status() + out = [] + for it in resp.json().get("value", []): + out.append({ + "name": it.get("name"), + "id": it.get("id"), + "size": it.get("size"), + "folder": "folder" in it, + "lastModifiedDateTime": it.get("lastModifiedDateTime"), + "content_type": (it.get("file", {}) or {}).get("mimeType"), + "webUrl": it.get("webUrl"), + }) + return out + + def is_exists_item(self, path: str) -> bool: + url = self._item_by_path_url(path) + # /content を付けない (メタデータ取得) + url = f"{url}" + resp = self._session.get(url, headers=self._headers()) + if resp.status_code == 200: + return True + if resp.status_code == 404: + return False + resp.raise_for_status() + return False + + def create_folder(self, folder_path: str) -> Dict[str, Any]: + """ + 中間フォルダも順次作成(簡易実装)。 + """ + parts = [p for p in self._normalize_path(folder_path).split("/") if p] + cur = "" + meta = None + for p in parts: + parent = cur + cur = "/".join([x for x in [cur, p] if x]) + # 既存チェック + if self.is_exists_item(cur): + continue + # 親の children へ POST + payload = {"name": p, "folder": {}, "@microsoft.graph.conflictBehavior": "fail"} + parent_children = f"{self._item_by_path_url(parent)}:/children" if parent else f"{self.GRAPH}/me/drive/root/children" + r = self._session.post(parent_children, headers=self._headers(), json=payload) + if r.status_code == 409: # 競合はスキップ + continue + r.raise_for_status() + meta = r.json() + # 最終パスのメタを返す + if meta is None: + r = self._session.get(self._item_by_path_url(folder_path), headers=self._headers()) + r.raise_for_status() + meta = r.json() + return meta + + # ----------------------- + # アップロード・ダウンロード・削除 + # ----------------------- + def _put_small(self, dest_path: str, fp: BinaryIO, content_type: Optional[str]) -> Dict[str, Any]: + url = f"{self._item_by_path_url(dest_path)}:/content" + headers = self._headers() + if content_type: + headers["Content-Type"] = content_type + r = self._session.put(url, headers=headers, data=fp) + r.raise_for_status() + return r.json() + + def _put_large(self, dest_path: str, fp: BinaryIO, chunk_size: int = 8 * 1024 * 1024) -> Dict[str, Any]: + # 1) アップロードセッションを作成 + create_url = f"{self._item_by_path_url(dest_path)}:/createUploadSession" + r = self._session.post(create_url, headers=self._headers(), json={"item": {"@microsoft.graph.conflictBehavior": "replace"}}) + r.raise_for_status() + session = r.json() + upload_url = session["uploadUrl"] + + # 2) チャンク分割で PUT + total = fp.seek(0, io.SEEK_END) + fp.seek(0) + start = 0 + while start < total: + end = min(start + chunk_size, total) - 1 + length = end - start + 1 + headers = { + "Content-Length": str(length), + "Content-Range": f"bytes {start}-{end}/{total}", + } + fp.seek(start) + data = fp.read(length) + rr = self._session.put(upload_url, headers=headers, data=data) + if rr.status_code not in (200, 201, 202): + rr.raise_for_status() + start = end + 1 + + # 3) 最終レスポンス取得 + # 最後の PUT が 200/201 の場合は JSON に item 情報が含まれる + if rr.headers.get("Content-Type", "").startswith("application/json"): + return rr.json() + # 念のためメタデータを再取得 + meta = self.stat_item(dest_path) + return meta or {"name": pathlib.Path(dest_path).name} + + def write_item( + self, + dest_path: str, + data: Union[bytes, BinaryIO, str], + content_type: Optional[str] = None, + ensure_parent: bool = True, + ) -> Dict[str, Any]: + """ + dest_path: 'folder/subfolder/file.txt' のような OneDrive 内パス + data: バイト列 / ファイルライク / 既存ファイルパス + """ + # 親フォルダを必要なら作成 + parent = "/".join(self._normalize_path(dest_path).split("/")[:-1]) + if ensure_parent and parent: + self.create_folder(parent) + + # content-type + if content_type is None: + content_type = mimetypes.guess_type(dest_path)[0] or "application/octet-stream" + + # データをファイルライクに統一 + must_close = False + if isinstance(data, (bytes, bytearray)): + fp = io.BytesIO(data) + elif hasattr(data, "read"): + fp = data # type: ignore + elif isinstance(data, str) and os.path.exists(data): + fp = open(data, "rb") + must_close = True + else: + raise ValueError("data must be bytes, file-like, or existing filepath") + + try: + # サイズ判定 + pos = fp.tell() + fp.seek(0, io.SEEK_END) + size = fp.tell() + fp.seek(pos) + + if size <= self.SMALL_UPLOAD_LIMIT: + return self._put_small(dest_path, fp, content_type) + else: + return self._put_large(dest_path, fp) + finally: + if must_close: + fp.close() + + def read_item(self, path: str, as_text: bool = False, encoding: str = "utf-8") -> Union[bytes, str]: + """ + /content GET でバイナリ取得 + """ + url = f"{self._item_by_path_url(path)}:/content" + r = self._session.get(url, headers=self._headers(), stream=True) + r.raise_for_status() + data = r.content + return data.decode(encoding) if as_text else data + + def delete_item(self, path: str): + url = f"{self._item_by_path_url(path)}" + r = self._session.delete(url, headers=self._headers()) + if r.status_code not in (204, 200): + r.raise_for_status() + + def stat_item(self, path: str) -> Optional[Dict[str, Any]]: + url = f"{self._item_by_path_url(path)}" + r = self._session.get(url, headers=self._headers()) + if r.status_code == 404: + return None + r.raise_for_status() + return r.json() + + # ----------------------- + # 共有リンク(擬似 Signed URL) + # ----------------------- + def generate_share_link( + self, + path: str, + link_type: str = "view", # "view" | "edit" | "embed" + scope: str = "anonymous", # "anonymous" | "organization" + password: Optional[str] = None # 任意 + ) -> str: + """ + Graph の createLink を使って共有URLを生成。 + 注意: GCS の signed URL と異なり、有効期限はテナント/ポリシー依存(APIで任意期限を直指定は不可の場合あり)。 + """ + url = f"{self._item_by_path_url(path)}:/createLink" + body: Dict[str, Any] = {"type": link_type, "scope": scope} + if password: + body["password"] = password # パスワード保護(ポリシーにより可否) + r: requests.Response = self._session.post(url, headers=self._headers(), json=body) + r.raise_for_status() + data:dict = r.json() + return data.get("link", {}).get("webUrl") or data.get("link", {}).get("url") or "" +