Compare commits

...

2 Commits

Author SHA1 Message Date
ry.yamafuji
649015d016 ライブラリ追加 2025-09-10 04:45:25 +09:00
ry.yamafuji
2ffaf13ab8 OneDrive処理追加 2025-09-10 04:45:09 +09:00
5 changed files with 398 additions and 4 deletions

35
docments/ondrive.md Normal file
View File

@ -0,0 +1,35 @@
# OneDraiveに連携する方法
MSAL(認証) + requests(HTTP)のみです。
これでMicrosoft GraphのOneDrive APIを直接叩けます
## 事前準備(無料)
* アカウント
- Microsoft アカウント(個人 OneDrive)
- Microsoft Entra ID(旧Azure ADOneDrive for Business)。
- どちらでも Graph 経由で同じコードが使えます。
* アプリ登録(Azure Portal)
* https://azure.microsoft.com/ja-jp/get-started/azure-portal
* Azure Portal → App registrations → New registration
* サポートするアカウント種別は簡単のため “common” 相当(個人/組織どちらもOK)を選ぶと便利
* リダイレクトURIは不要(後述のデバイスコードフローならクライアントIDだけでOK)
* Authentication で Mobile and desktop flows(Public client) を有効化(Device Code Flow用)。
* App registrationsで選択する
* 左メニューのAuthentication
* Mobile and desktop flows(Public client)を有効にする
* API permissions に Microsoft Graph → Delegated で最低限
* Files.ReadWrite(ユーザーのファイルを読み書き)
* offline_access(リフレッシュ/長期トークン)
* User.Read(基本プロフィール)
## ライブラリをインストール
```sh
pip install msal requests
```

View File

@ -0,0 +1,22 @@
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
from dotenv import load_dotenv
load_dotenv(".env")
from lib.custom_logger import get_logger
logger = get_logger(level=10)
from providers.one_drive_provider import OneDriveProvider
def example_onedrive():
logger.info("Starting OneDrive example")
# case Supported account types= Personal Microsoft account users
authority = "https://login.microsoftonline.com/consumers"
token_cache_path=".onedrive_cache.json" # ★キャッシュファイル
provider = OneDriveProvider(authority=authority, token_cache_path=token_cache_path)
logger.info(f"provider {provider.client_id}")
logger.info(f"Listed items: {provider.get_items()}")
example_onedrive()

View File

@ -1,4 +1,14 @@
matplotlib
requests
pyttsx3
# firebase_provider
firebase-admin>=7.1.0
# google cloud storage
google-cloud-storage
# onedrive
msal
# common
python-dotenv

View File

@ -5,7 +5,6 @@ from datetime import timedelta
import mimetypes
from google.cloud import storage
from google.oauth2 import service_account
from lib.custom_logger import get_logger
@ -65,8 +64,8 @@ class GoogleCloudStorageProvider:
return False
# オブジェクト操作
def get_items(self, bucket: str, prefix: str | None = None) -> List[Dict[str, Any]]:
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix)
def get_items(self, bucket: str, prefix: str | None = None, match_glob:str | None=None) -> List[Dict[str, Any]]:
items: List[storage.Blob] = self._client.list_blobs(bucket, prefix=prefix,match_glob=match_glob)
return [{"name": bl.name, "size": bl.size, "updated": bl.updated, "content_type": bl.content_type}
for bl in items]
@ -95,6 +94,10 @@ class GoogleCloudStorageProvider:
data = self._blob(bucket, object_name).download_as_bytes()
return data.decode(encoding) if as_text else data
def delete_item(self, bucket: str, object_name: str):
"""オブジェクトを削除する"""
self._blob(bucket, object_name).delete()
def generate_signed_url(self, bucket: str, object_name: str, method: str = "GET",
expires: timedelta = timedelta(hours=1)) -> str:
return self._blob(bucket, object_name).generate_signed_url(expiration=expires, method=method)

View File

@ -0,0 +1,324 @@
import os
import io
import mimetypes
from typing import Optional, List, Dict, Any, Union, BinaryIO
import pathlib
import requests
import msal
from lib.custom_logger import get_logger
logger = get_logger()
class OneDriveProvider:
GRAPH = "https://graph.microsoft.com/v1.0"
DEFAULT_SCOPES = ["Files.ReadWrite", "User.Read"]
DEFAULT_AUTHORITY = "https://login.microsoftonline.com/common"
SMALL_UPLOAD_LIMIT = 4 * 1024 * 1024 # 4MB
def __init__(
self,
client_id: Optional[str] = None,
authority: Optional[str] = None,
scopes: Optional[List[str]] = None,
token_cache_path: Optional[str] = None,
access_token: Optional[str] = None,
):
"""
Args:
client_id: Azure Portal のアプリ(公開クライアント) Client ID
authority: 'https://login.microsoftonline.com/{tenant}' (未指定は 'common')
scopes: ["Files.ReadWrite", "User.Read", "offline_access"]
token_cache_path: MSAL のシリアライズ済みトークンキャッシュ保存先任意
access_token: 既に取得済みの Bearer トークンを直接使いたい場合任意
"""
self.client_id = client_id or os.getenv("MS_CLIENT_ID") or ""
self.authority = authority or self.DEFAULT_AUTHORITY
self.scopes = scopes or self.DEFAULT_SCOPES
self.token_cache_path = token_cache_path
self._token_cache = msal.SerializableTokenCache() if token_cache_path else None
if self._token_cache and os.path.exists(token_cache_path):
try:
self._token_cache.deserialize(open(token_cache_path, "r", encoding="utf-8").read())
except Exception:
logger.warning("Failed to load token cache. Continuing without cache.")
self._app = msal.PublicClientApplication(
client_id=self.client_id,
authority=self.authority,
token_cache=self._token_cache,
)
self._session = requests.Session()
self._access_token = access_token # 直渡しトークンがあればそれを優先
# -----------------------
# 認証・トークン管理
# -----------------------
def _save_cache(self):
if self._token_cache and self.token_cache_path:
with open(self.token_cache_path, "w", encoding="utf-8") as f:
f.write(self._token_cache.serialize())
def ensure_token(self):
"""有効な Access Token を確保(キャッシュ→デバイスコード)。"""
if self._access_token:
return self._access_token
# キャッシュからサイレント取得
if self._token_cache:
accounts = self._app.get_accounts()
if accounts:
result = self._app.acquire_token_silent(self.scopes, account=accounts[0])
if result and "access_token" in result:
self._access_token = result["access_token"]
return self._access_token
# Device Code Flow
flow = self._app.initiate_device_flow(scopes=self.scopes)
if "user_code" not in flow:
raise RuntimeError("Device flow の初期化に失敗しました")
print(flow["message"]) # 表示の指示に従ってブラウザで認証
result = self._app.acquire_token_by_device_flow(flow)
if "access_token" not in result:
raise RuntimeError(f"トークン取得失敗: {result.get('error_description')}")
self._access_token = result["access_token"]
self._save_cache()
return self._access_token
def _headers(self) -> Dict[str, str]:
token = self.ensure_token()
return {"Authorization": f"Bearer {token}"}
# -----------------------
# パスユーティリティ
# -----------------------
@staticmethod
def _normalize_path(path: Optional[str]) -> str:
"""
Graph のパス表記: /me/drive/root:/foo/bar:/children のように使用
先頭に / を付けずURL エンコードは requests 側に任せる前提で
空白や日本語は安全のため quote することを推奨今回は簡易化
"""
if not path or path.strip() in ["/", "."]:
return ""
# 先頭・末尾スラッシュを整理
p = str(path).strip().strip("/")
return p
def _item_by_path_url(self, path: str) -> str:
norm = self._normalize_path(path)
if norm:
return f"{self.GRAPH}/me/drive/root:/{norm}"
else:
return f"{self.GRAPH}/me/drive/root"
# -----------------------
# 一覧・存在・フォルダ
# -----------------------
def get_items(self, prefix: Optional[str] = None) -> List[Dict[str, Any]]:
"""
ルート or 指定フォルダ直下の一覧を返す
Returns: [{name, id, size, folder(bool), lastModifiedDateTime, content_type?...}]
"""
base = self._item_by_path_url(prefix)
url = f"{base}:/children" if prefix else f"{base}/children"
resp = self._session.get(url, headers=self._headers())
resp.raise_for_status()
out = []
for it in resp.json().get("value", []):
out.append({
"name": it.get("name"),
"id": it.get("id"),
"size": it.get("size"),
"folder": "folder" in it,
"lastModifiedDateTime": it.get("lastModifiedDateTime"),
"content_type": (it.get("file", {}) or {}).get("mimeType"),
"webUrl": it.get("webUrl"),
})
return out
def is_exists_item(self, path: str) -> bool:
url = self._item_by_path_url(path)
# /content を付けない (メタデータ取得)
url = f"{url}"
resp = self._session.get(url, headers=self._headers())
if resp.status_code == 200:
return True
if resp.status_code == 404:
return False
resp.raise_for_status()
return False
def create_folder(self, folder_path: str) -> Dict[str, Any]:
"""
中間フォルダも順次作成簡易実装
"""
parts = [p for p in self._normalize_path(folder_path).split("/") if p]
cur = ""
meta = None
for p in parts:
parent = cur
cur = "/".join([x for x in [cur, p] if x])
# 既存チェック
if self.is_exists_item(cur):
continue
# 親の children へ POST
payload = {"name": p, "folder": {}, "@microsoft.graph.conflictBehavior": "fail"}
parent_children = f"{self._item_by_path_url(parent)}:/children" if parent else f"{self.GRAPH}/me/drive/root/children"
r = self._session.post(parent_children, headers=self._headers(), json=payload)
if r.status_code == 409: # 競合はスキップ
continue
r.raise_for_status()
meta = r.json()
# 最終パスのメタを返す
if meta is None:
r = self._session.get(self._item_by_path_url(folder_path), headers=self._headers())
r.raise_for_status()
meta = r.json()
return meta
# -----------------------
# アップロード・ダウンロード・削除
# -----------------------
def _put_small(self, dest_path: str, fp: BinaryIO, content_type: Optional[str]) -> Dict[str, Any]:
url = f"{self._item_by_path_url(dest_path)}:/content"
headers = self._headers()
if content_type:
headers["Content-Type"] = content_type
r = self._session.put(url, headers=headers, data=fp)
r.raise_for_status()
return r.json()
def _put_large(self, dest_path: str, fp: BinaryIO, chunk_size: int = 8 * 1024 * 1024) -> Dict[str, Any]:
# 1) アップロードセッションを作成
create_url = f"{self._item_by_path_url(dest_path)}:/createUploadSession"
r = self._session.post(create_url, headers=self._headers(), json={"item": {"@microsoft.graph.conflictBehavior": "replace"}})
r.raise_for_status()
session = r.json()
upload_url = session["uploadUrl"]
# 2) チャンク分割で PUT
total = fp.seek(0, io.SEEK_END)
fp.seek(0)
start = 0
while start < total:
end = min(start + chunk_size, total) - 1
length = end - start + 1
headers = {
"Content-Length": str(length),
"Content-Range": f"bytes {start}-{end}/{total}",
}
fp.seek(start)
data = fp.read(length)
rr = self._session.put(upload_url, headers=headers, data=data)
if rr.status_code not in (200, 201, 202):
rr.raise_for_status()
start = end + 1
# 3) 最終レスポンス取得
# 最後の PUT が 200/201 の場合は JSON に item 情報が含まれる
if rr.headers.get("Content-Type", "").startswith("application/json"):
return rr.json()
# 念のためメタデータを再取得
meta = self.stat_item(dest_path)
return meta or {"name": pathlib.Path(dest_path).name}
def write_item(
self,
dest_path: str,
data: Union[bytes, BinaryIO, str],
content_type: Optional[str] = None,
ensure_parent: bool = True,
) -> Dict[str, Any]:
"""
dest_path: 'folder/subfolder/file.txt' のような OneDrive 内パス
data: バイト列 / ファイルライク / 既存ファイルパス
"""
# 親フォルダを必要なら作成
parent = "/".join(self._normalize_path(dest_path).split("/")[:-1])
if ensure_parent and parent:
self.create_folder(parent)
# content-type
if content_type is None:
content_type = mimetypes.guess_type(dest_path)[0] or "application/octet-stream"
# データをファイルライクに統一
must_close = False
if isinstance(data, (bytes, bytearray)):
fp = io.BytesIO(data)
elif hasattr(data, "read"):
fp = data # type: ignore
elif isinstance(data, str) and os.path.exists(data):
fp = open(data, "rb")
must_close = True
else:
raise ValueError("data must be bytes, file-like, or existing filepath")
try:
# サイズ判定
pos = fp.tell()
fp.seek(0, io.SEEK_END)
size = fp.tell()
fp.seek(pos)
if size <= self.SMALL_UPLOAD_LIMIT:
return self._put_small(dest_path, fp, content_type)
else:
return self._put_large(dest_path, fp)
finally:
if must_close:
fp.close()
def read_item(self, path: str, as_text: bool = False, encoding: str = "utf-8") -> Union[bytes, str]:
"""
/content GET でバイナリ取得
"""
url = f"{self._item_by_path_url(path)}:/content"
r = self._session.get(url, headers=self._headers(), stream=True)
r.raise_for_status()
data = r.content
return data.decode(encoding) if as_text else data
def delete_item(self, path: str):
url = f"{self._item_by_path_url(path)}"
r = self._session.delete(url, headers=self._headers())
if r.status_code not in (204, 200):
r.raise_for_status()
def stat_item(self, path: str) -> Optional[Dict[str, Any]]:
url = f"{self._item_by_path_url(path)}"
r = self._session.get(url, headers=self._headers())
if r.status_code == 404:
return None
r.raise_for_status()
return r.json()
# -----------------------
# 共有リンク(擬似 Signed URL
# -----------------------
def generate_share_link(
self,
path: str,
link_type: str = "view", # "view" | "edit" | "embed"
scope: str = "anonymous", # "anonymous" | "organization"
password: Optional[str] = None # 任意
) -> str:
"""
Graph createLink を使って共有URLを生成
注意: GCS signed URL と異なり有効期限はテナント/ポリシー依存(APIで任意期限を直指定は不可の場合あり)
"""
url = f"{self._item_by_path_url(path)}:/createLink"
body: Dict[str, Any] = {"type": link_type, "scope": scope}
if password:
body["password"] = password # パスワード保護(ポリシーにより可否)
r: requests.Response = self._session.post(url, headers=self._headers(), json=body)
r.raise_for_status()
data:dict = r.json()
return data.get("link", {}).get("webUrl") or data.get("link", {}).get("url") or ""