Compare commits
2 Commits
b58048fe5a
...
649015d016
Author | SHA1 | Date | |
---|---|---|---|
![]() |
649015d016 | ||
![]() |
2ffaf13ab8 |
35
docments/ondrive.md
Normal file
35
docments/ondrive.md
Normal file
@ -0,0 +1,35 @@
|
||||
# OneDraiveに連携する方法
|
||||
|
||||
MSAL(認証) + requests(HTTP)のみです。
|
||||
これでMicrosoft GraphのOneDrive APIを直接叩けます
|
||||
|
||||
## 事前準備(無料)
|
||||
|
||||
* アカウント
|
||||
- Microsoft アカウント(個人 OneDrive)
|
||||
- Microsoft Entra ID(旧Azure AD:OneDrive for Business)。
|
||||
- どちらでも Graph 経由で同じコードが使えます。
|
||||
* アプリ登録(Azure Portal)
|
||||
* https://azure.microsoft.com/ja-jp/get-started/azure-portal
|
||||
* Azure Portal → App registrations → New registration
|
||||
* サポートするアカウント種別は簡単のため “common” 相当(個人/組織どちらもOK)を選ぶと便利
|
||||
* リダイレクトURIは不要(後述のデバイスコードフローならクライアントIDだけでOK)
|
||||
* Authentication で Mobile and desktop flows(Public client) を有効化(Device Code Flow用)。
|
||||
* App registrationsで選択する
|
||||
* 左メニューのAuthentication
|
||||
* Mobile and desktop flows(Public client)を有効にする
|
||||
* API permissions に Microsoft Graph → Delegated で最低限
|
||||
* Files.ReadWrite(ユーザーのファイルを読み書き)
|
||||
* offline_access(リフレッシュ/長期トークン)
|
||||
* User.Read(基本プロフィール)
|
||||
|
||||
|
||||
|
||||
|
||||
## ライブラリをインストール
|
||||
|
||||
```sh
|
||||
pip install msal requests
|
||||
```
|
||||
|
||||
|
22
example/example_one_drive.py
Normal file
22
example/example_one_drive.py
Normal file
@ -0,0 +1,22 @@
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(".env")
|
||||
|
||||
from lib.custom_logger import get_logger
|
||||
logger = get_logger(level=10)
|
||||
|
||||
from providers.one_drive_provider import OneDriveProvider
|
||||
|
||||
def example_onedrive():
|
||||
logger.info("Starting OneDrive example")
|
||||
# case Supported account types= Personal Microsoft account users
|
||||
authority = "https://login.microsoftonline.com/consumers"
|
||||
token_cache_path=".onedrive_cache.json" # ★キャッシュファイル
|
||||
provider = OneDriveProvider(authority=authority, token_cache_path=token_cache_path)
|
||||
logger.info(f"provider {provider.client_id}")
|
||||
logger.info(f"Listed items: {provider.get_items()}")
|
||||
|
||||
example_onedrive()
|
@ -1,4 +1,14 @@
|
||||
matplotlib
|
||||
requests
|
||||
pyttsx3
|
||||
|
||||
|
||||
# firebase_provider
|
||||
firebase-admin>=7.1.0
|
||||
# google cloud storage
|
||||
google-cloud-storage
|
||||
# onedrive
|
||||
msal
|
||||
|
||||
# common
|
||||
python-dotenv
|
||||
|
@ -5,7 +5,6 @@ from datetime import timedelta
|
||||
import mimetypes
|
||||
|
||||
from google.cloud import storage
|
||||
|
||||
from google.oauth2 import service_account
|
||||
|
||||
from lib.custom_logger import get_logger
|
||||
@ -65,8 +64,8 @@ class GoogleCloudStorageProvider:
|
||||
return False
|
||||
|
||||
# オブジェクト操作
|
||||
def get_items(self, bucket: str, prefix: str | None = None) -> List[Dict[str, Any]]:
|
||||
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix)
|
||||
def get_items(self, bucket: str, prefix: str | None = None, match_glob:str | None=None) -> List[Dict[str, Any]]:
|
||||
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix,match_glob=match_glob)
|
||||
return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type}
|
||||
for bl in items]
|
||||
|
||||
@ -95,6 +94,10 @@ class GoogleCloudStorageProvider:
|
||||
data = self._blob(bucket, object_name).download_as_bytes()
|
||||
return data.decode(encoding) if as_text else data
|
||||
|
||||
def delete_item(self, bucket: str, object_name: str):
|
||||
"""オブジェクトを削除する"""
|
||||
self._blob(bucket, object_name).delete()
|
||||
|
||||
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
|
||||
expires: timedelta = timedelta(hours=1)) -> str:
|
||||
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)
|
324
src/providers/one_drive_provider.py
Normal file
324
src/providers/one_drive_provider.py
Normal file
@ -0,0 +1,324 @@
|
||||
import os
|
||||
import io
|
||||
import mimetypes
|
||||
from typing import Optional, List, Dict, Any, Union, BinaryIO
|
||||
import pathlib
|
||||
import requests
|
||||
import msal
|
||||
|
||||
|
||||
|
||||
from lib.custom_logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
class OneDriveProvider:
|
||||
|
||||
GRAPH = "https://graph.microsoft.com/v1.0"
|
||||
DEFAULT_SCOPES = ["Files.ReadWrite", "User.Read"]
|
||||
DEFAULT_AUTHORITY = "https://login.microsoftonline.com/common"
|
||||
SMALL_UPLOAD_LIMIT = 4 * 1024 * 1024 # 4MB
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client_id: Optional[str] = None,
|
||||
authority: Optional[str] = None,
|
||||
scopes: Optional[List[str]] = None,
|
||||
token_cache_path: Optional[str] = None,
|
||||
access_token: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
client_id: Azure Portal のアプリ(公開クライアント)の Client ID
|
||||
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
|
||||
scopes: 例 ["Files.ReadWrite", "User.Read", "offline_access"]
|
||||
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先(任意)
|
||||
access_token: 既に取得済みの Bearer トークンを直接使いたい場合(任意)
|
||||
"""
|
||||
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
|
||||
self.authority = authority or self.DEFAULT_AUTHORITY
|
||||
self.scopes = scopes or self.DEFAULT_SCOPES
|
||||
self.token_cache_path = token_cache_path
|
||||
self._token_cache = msal.SerializableTokenCache() if token_cache_path else None
|
||||
|
||||
if self._token_cache and os.path.exists(token_cache_path):
|
||||
try:
|
||||
self._token_cache.deserialize(open(token_cache_path, "r", encoding="utf-8").read())
|
||||
except Exception:
|
||||
logger.warning("Failed to load token cache. Continuing without cache.")
|
||||
|
||||
self._app = msal.PublicClientApplication(
|
||||
client_id=self.client_id,
|
||||
authority=self.authority,
|
||||
token_cache=self._token_cache,
|
||||
)
|
||||
|
||||
self._session = requests.Session()
|
||||
self._access_token = access_token # 直渡しトークンがあればそれを優先
|
||||
|
||||
# -----------------------
|
||||
# 認証・トークン管理
|
||||
# -----------------------
|
||||
def _save_cache(self):
|
||||
if self._token_cache and self.token_cache_path:
|
||||
with open(self.token_cache_path, "w", encoding="utf-8") as f:
|
||||
f.write(self._token_cache.serialize())
|
||||
|
||||
def ensure_token(self):
|
||||
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
|
||||
if self._access_token:
|
||||
return self._access_token
|
||||
|
||||
# キャッシュからサイレント取得
|
||||
if self._token_cache:
|
||||
accounts = self._app.get_accounts()
|
||||
if accounts:
|
||||
result = self._app.acquire_token_silent(self.scopes, account=accounts[0])
|
||||
if result and "access_token" in result:
|
||||
self._access_token = result["access_token"]
|
||||
return self._access_token
|
||||
|
||||
# Device Code Flow
|
||||
flow = self._app.initiate_device_flow(scopes=self.scopes)
|
||||
if "user_code" not in flow:
|
||||
raise RuntimeError("Device flow の初期化に失敗しました")
|
||||
print(flow["message"]) # 表示の指示に従ってブラウザで認証
|
||||
|
||||
result = self._app.acquire_token_by_device_flow(flow)
|
||||
if "access_token" not in result:
|
||||
raise RuntimeError(f"トークン取得失敗: {result.get('error_description')}")
|
||||
self._access_token = result["access_token"]
|
||||
self._save_cache()
|
||||
return self._access_token
|
||||
|
||||
def _headers(self) -> Dict[str, str]:
|
||||
token = self.ensure_token()
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
# -----------------------
|
||||
# パスユーティリティ
|
||||
# -----------------------
|
||||
@staticmethod
|
||||
def _normalize_path(path: Optional[str]) -> str:
|
||||
"""
|
||||
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用。
|
||||
先頭に / を付けず、URL エンコードは requests 側に任せる前提で
|
||||
空白や日本語は安全のため quote することを推奨(今回は簡易化)。
|
||||
"""
|
||||
if not path or path.strip() in ["/", "."]:
|
||||
return ""
|
||||
# 先頭・末尾スラッシュを整理
|
||||
p = str(path).strip().strip("/")
|
||||
return p
|
||||
|
||||
def _item_by_path_url(self, path: str) -> str:
|
||||
norm = self._normalize_path(path)
|
||||
if norm:
|
||||
return f"{self.GRAPH}/me/drive/root:/{norm}"
|
||||
else:
|
||||
return f"{self.GRAPH}/me/drive/root"
|
||||
|
||||
# -----------------------
|
||||
# 一覧・存在・フォルダ
|
||||
# -----------------------
|
||||
def get_items(self, prefix: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
ルート or 指定フォルダ直下の一覧を返す。
|
||||
Returns: [{name, id, size, folder(bool), lastModifiedDateTime, content_type?...}]
|
||||
"""
|
||||
base = self._item_by_path_url(prefix)
|
||||
url = f"{base}:/children" if prefix else f"{base}/children"
|
||||
resp = self._session.get(url, headers=self._headers())
|
||||
resp.raise_for_status()
|
||||
out = []
|
||||
for it in resp.json().get("value", []):
|
||||
out.append({
|
||||
"name": it.get("name"),
|
||||
"id": it.get("id"),
|
||||
"size": it.get("size"),
|
||||
"folder": "folder" in it,
|
||||
"lastModifiedDateTime": it.get("lastModifiedDateTime"),
|
||||
"content_type": (it.get("file", {}) or {}).get("mimeType"),
|
||||
"webUrl": it.get("webUrl"),
|
||||
})
|
||||
return out
|
||||
|
||||
def is_exists_item(self, path: str) -> bool:
|
||||
url = self._item_by_path_url(path)
|
||||
# /content を付けない (メタデータ取得)
|
||||
url = f"{url}"
|
||||
resp = self._session.get(url, headers=self._headers())
|
||||
if resp.status_code == 200:
|
||||
return True
|
||||
if resp.status_code == 404:
|
||||
return False
|
||||
resp.raise_for_status()
|
||||
return False
|
||||
|
||||
def create_folder(self, folder_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
中間フォルダも順次作成(簡易実装)。
|
||||
"""
|
||||
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
|
||||
cur = ""
|
||||
meta = None
|
||||
for p in parts:
|
||||
parent = cur
|
||||
cur = "/".join([x for x in [cur, p] if x])
|
||||
# 既存チェック
|
||||
if self.is_exists_item(cur):
|
||||
continue
|
||||
# 親の children へ POST
|
||||
payload = {"name": p, "folder": {}, "@microsoft.graph.conflictBehavior": "fail"}
|
||||
parent_children = f"{self._item_by_path_url(parent)}:/children" if parent else f"{self.GRAPH}/me/drive/root/children"
|
||||
r = self._session.post(parent_children, headers=self._headers(), json=payload)
|
||||
if r.status_code == 409: # 競合はスキップ
|
||||
continue
|
||||
r.raise_for_status()
|
||||
meta = r.json()
|
||||
# 最終パスのメタを返す
|
||||
if meta is None:
|
||||
r = self._session.get(self._item_by_path_url(folder_path), headers=self._headers())
|
||||
r.raise_for_status()
|
||||
meta = r.json()
|
||||
return meta
|
||||
|
||||
# -----------------------
|
||||
# アップロード・ダウンロード・削除
|
||||
# -----------------------
|
||||
def _put_small(self, dest_path: str, fp: BinaryIO, content_type: Optional[str]) -> Dict[str, Any]:
|
||||
url = f"{self._item_by_path_url(dest_path)}:/content"
|
||||
headers = self._headers()
|
||||
if content_type:
|
||||
headers["Content-Type"] = content_type
|
||||
r = self._session.put(url, headers=headers, data=fp)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def _put_large(self, dest_path: str, fp: BinaryIO, chunk_size: int = 8 * 1024 * 1024) -> Dict[str, Any]:
|
||||
# 1) アップロードセッションを作成
|
||||
create_url = f"{self._item_by_path_url(dest_path)}:/createUploadSession"
|
||||
r = self._session.post(create_url, headers=self._headers(), json={"item": {"@microsoft.graph.conflictBehavior": "replace"}})
|
||||
r.raise_for_status()
|
||||
session = r.json()
|
||||
upload_url = session["uploadUrl"]
|
||||
|
||||
# 2) チャンク分割で PUT
|
||||
total = fp.seek(0, io.SEEK_END)
|
||||
fp.seek(0)
|
||||
start = 0
|
||||
while start < total:
|
||||
end = min(start + chunk_size, total) - 1
|
||||
length = end - start + 1
|
||||
headers = {
|
||||
"Content-Length": str(length),
|
||||
"Content-Range": f"bytes {start}-{end}/{total}",
|
||||
}
|
||||
fp.seek(start)
|
||||
data = fp.read(length)
|
||||
rr = self._session.put(upload_url, headers=headers, data=data)
|
||||
if rr.status_code not in (200, 201, 202):
|
||||
rr.raise_for_status()
|
||||
start = end + 1
|
||||
|
||||
# 3) 最終レスポンス取得
|
||||
# 最後の PUT が 200/201 の場合は JSON に item 情報が含まれる
|
||||
if rr.headers.get("Content-Type", "").startswith("application/json"):
|
||||
return rr.json()
|
||||
# 念のためメタデータを再取得
|
||||
meta = self.stat_item(dest_path)
|
||||
return meta or {"name": pathlib.Path(dest_path).name}
|
||||
|
||||
def write_item(
|
||||
self,
|
||||
dest_path: str,
|
||||
data: Union[bytes, BinaryIO, str],
|
||||
content_type: Optional[str] = None,
|
||||
ensure_parent: bool = True,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
dest_path: 'folder/subfolder/file.txt' のような OneDrive 内パス
|
||||
data: バイト列 / ファイルライク / 既存ファイルパス
|
||||
"""
|
||||
# 親フォルダを必要なら作成
|
||||
parent = "/".join(self._normalize_path(dest_path).split("/")[:-1])
|
||||
if ensure_parent and parent:
|
||||
self.create_folder(parent)
|
||||
|
||||
# content-type
|
||||
if content_type is None:
|
||||
content_type = mimetypes.guess_type(dest_path)[0] or "application/octet-stream"
|
||||
|
||||
# データをファイルライクに統一
|
||||
must_close = False
|
||||
if isinstance(data, (bytes, bytearray)):
|
||||
fp = io.BytesIO(data)
|
||||
elif hasattr(data, "read"):
|
||||
fp = data # type: ignore
|
||||
elif isinstance(data, str) and os.path.exists(data):
|
||||
fp = open(data, "rb")
|
||||
must_close = True
|
||||
else:
|
||||
raise ValueError("data must be bytes, file-like, or existing filepath")
|
||||
|
||||
try:
|
||||
# サイズ判定
|
||||
pos = fp.tell()
|
||||
fp.seek(0, io.SEEK_END)
|
||||
size = fp.tell()
|
||||
fp.seek(pos)
|
||||
|
||||
if size <= self.SMALL_UPLOAD_LIMIT:
|
||||
return self._put_small(dest_path, fp, content_type)
|
||||
else:
|
||||
return self._put_large(dest_path, fp)
|
||||
finally:
|
||||
if must_close:
|
||||
fp.close()
|
||||
|
||||
def read_item(self, path: str, as_text: bool = False, encoding: str = "utf-8") -> Union[bytes, str]:
|
||||
"""
|
||||
/content GET でバイナリ取得
|
||||
"""
|
||||
url = f"{self._item_by_path_url(path)}:/content"
|
||||
r = self._session.get(url, headers=self._headers(), stream=True)
|
||||
r.raise_for_status()
|
||||
data = r.content
|
||||
return data.decode(encoding) if as_text else data
|
||||
|
||||
def delete_item(self, path: str):
|
||||
url = f"{self._item_by_path_url(path)}"
|
||||
r = self._session.delete(url, headers=self._headers())
|
||||
if r.status_code not in (204, 200):
|
||||
r.raise_for_status()
|
||||
|
||||
def stat_item(self, path: str) -> Optional[Dict[str, Any]]:
|
||||
url = f"{self._item_by_path_url(path)}"
|
||||
r = self._session.get(url, headers=self._headers())
|
||||
if r.status_code == 404:
|
||||
return None
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
# -----------------------
|
||||
# 共有リンク(擬似 Signed URL)
|
||||
# -----------------------
|
||||
def generate_share_link(
|
||||
self,
|
||||
path: str,
|
||||
link_type: str = "view", # "view" | "edit" | "embed"
|
||||
scope: str = "anonymous", # "anonymous" | "organization"
|
||||
password: Optional[str] = None # 任意
|
||||
) -> str:
|
||||
"""
|
||||
Graph の createLink を使って共有URLを生成。
|
||||
注意: GCS の signed URL と異なり、有効期限はテナント/ポリシー依存(APIで任意期限を直指定は不可の場合あり)。
|
||||
"""
|
||||
url = f"{self._item_by_path_url(path)}:/createLink"
|
||||
body: Dict[str, Any] = {"type": link_type, "scope": scope}
|
||||
if password:
|
||||
body["password"] = password # パスワード保護(ポリシーにより可否)
|
||||
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
|
||||
r.raise_for_status()
|
||||
data:dict = r.json()
|
||||
return data.get("link", {}).get("webUrl") or data.get("link", {}).get("url") or ""
|
||||
|
Loading…
x
Reference in New Issue
Block a user