すべての処理を用意

This commit is contained in:
ry.yamafuji 2025-09-11 21:26:01 +09:00
parent 3ab068ea9f
commit d7cfc32dcf
3 changed files with 277 additions and 81 deletions

View File

@ -115,6 +115,10 @@ class AppStatus(Singleton):
transcript_dir = f"{self.output_base_dir}/{self.request_id}/transcripts" transcript_dir = f"{self.output_base_dir}/{self.request_id}/transcripts"
return transcript_dir return transcript_dir
@property
def transcript_file(self)-> str:
"""文字起こし結果ファイルのパス"""
return f"{self.transcript_dir}/all.transcripts.json"
@property @property
def source_file(self)-> str: def source_file(self)-> str:
@ -136,3 +140,7 @@ class AppStatus(Singleton):
"""話者分離のJSONファイルのパス""" """話者分離のJSONファイルのパス"""
return f"{self.output_dir}/diarization.json" return f"{self.output_dir}/diarization.json"
@property
def merged_transcript_file(self)-> str:
"""文字起こし結果ファイルのパス"""
return f"{self.output_dir}/merged_transcript.json"

View File

@ -0,0 +1,216 @@
# jobs/job_merge_diarization_transcripts.py
import os
import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from jobs.job_base import JobBase
def _is_ascii_token(t: str) -> bool:
return all(ord(c) < 128 for c in t) and t.strip() != ""
def _concat_token(prev: str, token: str) -> str:
# 日本語など非ASCIIは基本スペース無し、ASCII(英数)はスペースで区切る
if not prev:
return token
if _is_ascii_token(token) and _is_ascii_token(prev[-1]):
return prev + " " + token
return prev + token
@dataclass
class MergeConfig:
# 同一話者セグメントの“隙間”がこの秒数以下なら結合
same_speaker_join_gap: float = 0.30
# 単語のどの点で所属判定するか: "mid" | "start" | "end"
word_anchor: str = "mid"
class JobMergeDiarizationAndTranscripts(JobBase):
"""
話者分離結果と文字起こし結果を突き合わせて話者付きの文字起こしを生成するジョブ
"""
def __init__(
self,
transcripts_json_path: Optional[str] = None,
diar_json_path: Optional[str] = None,
out_json_path: Optional[str] = None,
config: Optional[MergeConfig] = None
):
super().__init__(name=self.__class__.__name__)
self.description = "Merge diarization segments with STT words"
self.transcripts_json_path = transcripts_json_path
self.diar_json_path = diar_json_path
self.out_json_path = out_json_path
self.config = config or MergeConfig()
# -------- core logic --------
def _word_time_anchor(self, w: Dict[str, Any]) -> float:
if self.config.word_anchor == "start":
return float(w.get("start", 0.0))
if self.config.word_anchor == "end":
return float(w.get("end", 0.0))
s = float(w.get("start", 0.0))
e = float(w.get("end", s))
return 0.5 * (s + e)
def _assign_words_to_segments(
self,
words: List[Dict[str, Any]],
segs: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
# segごとに words を詰める
assigned = []
for seg in segs:
seg["words"] = []
assigned.append(seg)
# 事前に時間でソート
words_sorted = sorted(words, key=lambda w: self._word_time_anchor(w))
segs_sorted = sorted(assigned, key=lambda s: (s["start"], s["end"]))
si = 0
nseg = len(segs_sorted)
for w in words_sorted:
t = self._word_time_anchor(w)
# 今のセグメント位置から前進しながら所属先を探す
while si < nseg and t > segs_sorted[si]["end"]:
si += 1
# 過去のセグメントに入る可能性もあるので、1つ前もチェック
candidates = []
if si < nseg:
candidates.append(segs_sorted[si])
if si - 1 >= 0:
candidates.append(segs_sorted[si - 1])
chosen = None
for seg in candidates:
if seg["start"] <= t <= seg["end"]:
chosen:dict = seg
break
if chosen is not None:
words:List[Dict[str, Any]] = chosen["words"]
words.append(w)
# どこにも入らない単語はスキップ(VAD外のゴミ/誤検出)
# テキストを単語から生成(日本語は無スペース、英数はスペース)
for seg in segs_sorted:
text = ""
for w in seg["words"]:
token = str(w.get("word", "")).strip()
if not token:
continue
text = _concat_token(text, token)
seg["text"] = text
return segs_sorted
def _join_adjacent_same_speaker(
self, segs: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
if not segs:
return []
gap = float(self.config.same_speaker_join_gap)
out = []
cur = dict(segs[0])
cur["words"] = list(cur.get("words", [])) # copy
for s in segs[1:]:
if (
s["speaker"] == cur["speaker"] and
s["start"] - cur["end"] <= gap
):
# マージ
cur["end"] = max(cur["end"], s["end"])
cur["words"].extend(s.get("words", []))
# テキスト再生成
text = ""
words:List[Dict[str, Any]] = cur.get("words", [])
for w in words:
token = str(w.get("word", "")).strip()
if token:
text = _concat_token(text, token)
cur["text"] = text
else:
out.append(cur)
cur = dict(s)
cur["words"] = list(s.get("words", []))
out.append(cur)
return out
def _build_turns(self, segs_joined: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""スピーカータン(話者の連続ブロック)を作成。ここでは join 後のセグメントが既にTurn単位とみなし同一構造で返す"""
# 必要なら更に長い沈黙や改行ポリシーで段落化も可能
return segs_joined
# -------- I/O wrappers --------
def _load_json(self, path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _save_json(self, path: str, data: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# -------- public: pipeline entry --------
def execute(self):
self.logger.info(f"{self.name} execute started")
# 入力パスの決定(statusにあれば優先)
transcripts_path = self.transcripts_json_path or self.status.transcript_file
diar_path = self.diar_json_path or self.status.diarization_file
if not transcripts_path or not os.path.exists(transcripts_path):
raise FileNotFoundError(f"transcripts_json not found: {transcripts_path}")
if not diar_path or not os.path.exists(diar_path):
raise FileNotFoundError(f"diar_json not found: {diar_path}")
tr = self._load_json(transcripts_path)
dj = self._load_json(diar_path)
# 1チャンク前提で処理(複数チャンクならループで回す構造に拡張)
t0: Dict[str, Any] = tr["transcripts"][0]
chunk_id = t0.get("chunk_id")
abs_start = float(t0.get("abs_start", 0.0))
abs_end = float(t0.get("abs_end", 0.0))
words = t0.get("words", [])
segs = dj.get("segments", [])
# 念のため整形
segs_norm = []
for s in segs:
segs_norm.append({
"speaker": s["speaker"],
# diarの start/end は“元音源”基準。abs_startが0で無いなら必要に応じて補正
"start": float(s["start"]),
"end": float(s["end"]),
})
# 1) 単語を各話者セグメントに割り当ててテキスト生成
segs_assigned = self._assign_words_to_segments(words, segs_norm)
# 2) 近接する同一話者セグメントを結合(Turn化)
segs_joined = self._join_adjacent_same_speaker(segs_assigned)
# 3) 出力組み立て
out = {
"chunk_id": chunk_id,
"abs_start": abs_start,
"abs_end": abs_end,
"sample_rate": dj.get("sample_rate"),
"speakers": dj.get("speakers"),
"num_speakers": dj.get("num_speakers"),
"uri": dj.get("uri"),
"vad": dj.get("vad"),
"rttm": dj.get("rttm"),
"segments_raw": segs_assigned, # 結合前(元の細かいセグメント)
"turns": self._build_turns(segs_joined) # 結合後(読みやすい単位)
}
# 出力パス
out_path = self.out_json_path or getattr(self.status, "merged_transcript_json_path", None)
if not out_path:
# デフォルトの保存先(transcripts.json と同じ場所に merged.json)
base_dir = os.path.dirname(transcripts_path)
out_path = os.path.join(base_dir, "merged_transcript_turns.json")
self._save_json(out_path, out)
self.logger.info(f"Merged diarization+transcripts -> {out_path}")
return out

View File

@ -1,39 +1,36 @@
import os import os
import json import json
import time import time
import requests from typing import Dict, Any, Iterator, Optional, List
from typing import Any, Dict, List, Optional
from pathlib import Path from pathlib import Path
import modal
from jobs.job_base import JobBase from jobs.job_base import JobBase
class JobTranscribeChunkOpenAI(JobBase): class JobTranscribeChunkModal(JobBase):
""" """
チャンク群をOpenAIで逐次文字起こしするジョブ チャンク群をModalのfaster-whisperで文字起こしするジョブ
- 入力: chunks.manifest.json (JobChunkFilesで作成)
- 出力: チャンクごとの *.transcript.json 統合 all.transcripts.json
- タイムスタンプを abs_start で全体時刻に補正
""" """
def __init__(self): def __init__(self):
super().__init__(name=self.__class__.__name__) super().__init__(name=self.__class__.__name__)
self.description = "Transcribe Chunks via OpenAI" self.description = "Transcribe Chunks via Modal (faster-whisper)"
# 設定(環境変数で上書き可能) # Modal 関数指定(環境変数で上書き可能)
# self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") self.modal_app = os.getenv("MODAL_ASR_APP", "whisper-transcribe-fw")
self.model = os.getenv("ASR_MODEL", "whisper-1") self.modal_func = os.getenv("MODAL_ASR_FUNC", "transcribe_audio")
self.lang = os.getenv("ASR_LANG", "ja")
self.response_format = os.getenv("ASR_RESP_FMT", "json") # ASR 設定
self.word_ts = os.getenv("ASR_WORD_TS", "1") in ("1", "true", "True") self.model_name = os.getenv("ASR_MODEL", None) # 空なら Modal 側のデフォルト
self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600")) self.lang = os.getenv("ASR_LANG", "ja")
# 実行ポリシー
self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600"))
self.sleep_between = float(os.getenv("ASR_SLEEP_SEC", "0.3")) self.sleep_between = float(os.getenv("ASR_SLEEP_SEC", "0.3"))
self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2")) self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2"))
self.retry_backoff = float(os.getenv("ASR_RETRY_BACKOFF", "1.5")) # 乗算 self.retry_backoff = float(os.getenv("ASR_RETRY_BACKOFF", "1.5")) # 乗算
# OpenAI
self.api_key = os.getenv("OPENAI_API_KEY")
self.api_url = os.getenv("OPENAI_TRANSCRIBE_URL", "https://api.openai.com/v1/audio/transcriptions")
# ---------- パス ----------
def _manifest_path(self) -> Path: def _manifest_path(self) -> Path:
return Path(self.status.chunk_manifest) return Path(self.status.chunk_manifest)
@ -41,74 +38,54 @@ class JobTranscribeChunkOpenAI(JobBase):
def _out_dir(self) -> Path: def _out_dir(self) -> Path:
return Path(self.status.transcript_dir) return Path(self.status.transcript_dir)
# ---------- Modal 呼び出し ----------
def _transcribe_file(self, wav_path: str) -> Dict[str, Any]: def _transcribe_with_modal(self, wav_path: str) -> Dict[str, Any]:
if not self.api_key: """Modal の Function を直接呼び出すbytes渡し"""
raise RuntimeError("OPENAI_API_KEY が未設定です。環境変数に設定してください。") fn = modal.Function.from_name(self.modal_app, self.modal_func)
if "transcribe" in self.model and self.response_format == "json": data = Path(wav_path).read_bytes()
response_format = "verbose_json" kwargs = dict(filename=Path(wav_path).name, language=self.lang)
elif "whisper" in self.model and self.response_format == "json": if self.model_name:
response_format = "verbose_json" kwargs["model_name"] = self.model_name
else:
response_format = self.response_format
# self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe")
self.model = os.getenv("ASR_MODEL", "whisper-1")
headers = {"Authorization": f"Bearer {self.api_key}"}
data = {
"model": self.model,
"response_format": response_format,
"language": self.lang,
}
# 単語タイムスタンプが欲しければ指定
if self.word_ts:
# OpenAIのフォームは配列っぽいキー名を要求
data["timestamp_granularities[]"] = "word"
# 軽いリトライ # 軽いリトライ
wait = 1.0 wait = 1.0
last_err: Optional[Exception] = None last_err: Optional[Exception] = None
for attempt in range(self.max_retries + 1): for attempt in range(self.max_retries + 1):
try: try:
with open(wav_path, "rb") as f: # NOTE: Modal の .remote は同期戻り
files = {"file": (Path(wav_path).name, f, "audio/wav")} res = fn.remote(data, **kwargs)
r = requests.post(self.api_url, headers=headers, data=data, files=files, timeout=self.req_timeout) # 返り値は {"text","segments","words",...} を想定
if r.status_code == 429 or 500 <= r.status_code < 600: return res
raise requests.HTTPError(f"HTTP {r.status_code}: {r.text}")
r.raise_for_status()
return r.json()
except Exception as e: except Exception as e:
last_err = e last_err = e
if attempt < self.max_retries: if attempt < self.max_retries:
self.logger.warning(f"ASR retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}") self.logger.warning(f"Modal retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}")
time.sleep(wait) time.sleep(wait)
wait *= self.retry_backoff wait *= self.retry_backoff
continue continue
break break
# リトライ尽きた raise last_err if last_err else RuntimeError("Unknown Modal ASR error")
raise last_err if last_err else RuntimeError("Unknown ASR error")
# ---------- 実行本体 ----------
def execute(self): def execute(self):
self.logger.info(f"{self.name} execute started") self.logger.info(f"{self.name} execute started")
if os.path.exists(self.status.transcript_dir):
# すでに変換済み
self.logger.info(f"Transcription already done: {self.status.transcript_dir}")
return
manifest_path = self._manifest_path() manifest_path = self._manifest_path()
if not manifest_path.exists(): if not manifest_path.exists():
raise FileNotFoundError(f"chunks manifest not found: {manifest_path}") raise FileNotFoundError(f"chunks manifest not found: {manifest_path}")
out_dir = self._out_dir() out_dir = self._out_dir()
if out_dir.exists():
# すでに変換済み
self.logger.info(f"Transcription already done: {out_dir}")
return
out_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True)
meta = json.loads(manifest_path.read_text(encoding="utf-8")) meta = json.loads(manifest_path.read_text(encoding="utf-8"))
chunks: List[Dict[str, Any]] = meta.get("chunks") or [] chunks: List[Dict[str, Any]] = meta.get("chunks") or []
if not chunks: if not chunks:
self.logger.warning("No chunks found in manifest. Skipping.") self.logger.warning("No chunks found in manifest. Skipping.")
return return
@ -119,41 +96,36 @@ class JobTranscribeChunkOpenAI(JobBase):
wav = ch["path"] wav = ch["path"]
per_chunk_out = out_dir / f"{Path(wav).stem}.transcript.json" per_chunk_out = out_dir / f"{Path(wav).stem}.transcript.json"
# レジューム対応既に保存済みならスキップ # レジューム:保存済みならスキップして読み出し
if per_chunk_out.exists(): if per_chunk_out.exists():
res: dict = json.loads(per_chunk_out.read_text(encoding="utf-8")) res: dict = json.loads(per_chunk_out.read_text(encoding="utf-8"))
else: else:
self.logger.info(f"ASR: {wav}") self.logger.info(f"ASR(modal): {wav}")
res: dict = self._transcribe_file(wav) res: dict = self._transcribe_with_modal(wav)
per_chunk_out.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8") per_chunk_out.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8")
time.sleep(self.sleep_between) time.sleep(self.sleep_between)
# チャンク内→全体時刻に補正 # abs_start 分だけ全体時刻に補正
offset = float(ch["abs_start"]) offset = float(ch.get("abs_start", 0.0))
words = res.get("words") or [] words = res.get("words") or []
for w in words: for w in words:
if "start" in w: if "start" in w: w["start"] = float(w["start"]) + offset
w["start"] = float(w["start"]) + offset if "end" in w: w["end"] = float(w["end"]) + offset
if "end" in w:
w["end"] = float(w["end"]) + offset
segments = res.get("segments") or [] segments = res.get("segments") or []
for s in segments: for s in segments:
if "start" in s: if "start" in s: s["start"] = float(s["start"]) + offset
s["start"] = float(s["start"]) + offset if "end" in s: s["end"] = float(s["end"]) + offset
if "end" in s:
s["end"] = float(s["end"]) + offset
results.append({ results.append({
"chunk_id": ch["chunk_id"], "chunk_id": ch["chunk_id"],
"abs_start": ch["abs_start"], "abs_start": ch.get("abs_start", 0.0),
"abs_end": ch["abs_end"], "abs_end": ch.get("abs_end", 0.0),
"text": res.get("text", ""), "text": res.get("text", ""),
"segments": segments, "segments": segments,
"words": words, "words": words,
}) })
# 統合保存 # 統合保存OpenAI版と同じ形
all_out = out_dir / "all.transcripts.json" all_out = out_dir / "all.transcripts.json"
all_out.write_text(json.dumps({"transcripts": results}, ensure_ascii=False, indent=2), encoding="utf-8") all_out.write_text(json.dumps({"transcripts": results}, ensure_ascii=False, indent=2), encoding="utf-8")
self.logger.info(f"Transcription merged: {all_out}") self.logger.info(f"Transcription merged: {all_out}")