From d7cfc32dcfd35398ab4565bc79852399b1d2013a Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Thu, 11 Sep 2025 21:26:01 +0900 Subject: [PATCH] =?UTF-8?q?=E3=81=99=E3=81=B9=E3=81=A6=E3=81=AE=E5=87=A6?= =?UTF-8?q?=E7=90=86=E3=82=92=E7=94=A8=E6=84=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app_status.py | 10 +- src/jobs/job_merge_diarization_transcripts.py | 216 ++++++++++++++++++ src/jobs/job_transcribe_chk_modal.py | 132 +++++------ 3 files changed, 277 insertions(+), 81 deletions(-) create mode 100644 src/jobs/job_merge_diarization_transcripts.py diff --git a/src/app_status.py b/src/app_status.py index 882098c..40ca7f4 100644 --- a/src/app_status.py +++ b/src/app_status.py @@ -115,6 +115,10 @@ class AppStatus(Singleton): transcript_dir = f"{self.output_base_dir}/{self.request_id}/transcripts" return transcript_dir + @property + def transcript_file(self)-> str: + """文字起こし結果ファイルのパス""" + return f"{self.transcript_dir}/all.transcripts.json" @property def source_file(self)-> str: @@ -135,4 +139,8 @@ class AppStatus(Singleton): def diarization_file(self)-> str: """話者分離のJSONファイルのパス""" return f"{self.output_dir}/diarization.json" - + + @property + def merged_transcript_file(self)-> str: + """文字起こし結果ファイルのパス""" + return f"{self.output_dir}/merged_transcript.json" diff --git a/src/jobs/job_merge_diarization_transcripts.py b/src/jobs/job_merge_diarization_transcripts.py new file mode 100644 index 0000000..f16dc92 --- /dev/null +++ b/src/jobs/job_merge_diarization_transcripts.py @@ -0,0 +1,216 @@ +# jobs/job_merge_diarization_transcripts.py +import os +import json +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +from jobs.job_base import JobBase + +def _is_ascii_token(t: str) -> bool: + return all(ord(c) < 128 for c in t) and t.strip() != "" + +def _concat_token(prev: str, token: str) -> str: + # 日本語など非ASCIIは基本スペース無し、ASCII(英数)はスペースで区切る + if not prev: + return token + if _is_ascii_token(token) and _is_ascii_token(prev[-1]): + return prev + " " + token + return prev + token + +@dataclass +class MergeConfig: + # 同一話者セグメントの“隙間”がこの秒数以下なら結合 + same_speaker_join_gap: float = 0.30 + # 単語のどの点で所属判定するか: "mid" | "start" | "end" + word_anchor: str = "mid" + +class JobMergeDiarizationAndTranscripts(JobBase): + """ + 話者分離結果と文字起こし結果を突き合わせて、話者付きの文字起こしを生成するジョブ + """ + def __init__( + self, + transcripts_json_path: Optional[str] = None, + diar_json_path: Optional[str] = None, + out_json_path: Optional[str] = None, + config: Optional[MergeConfig] = None + ): + super().__init__(name=self.__class__.__name__) + self.description = "Merge diarization segments with STT words" + self.transcripts_json_path = transcripts_json_path + self.diar_json_path = diar_json_path + self.out_json_path = out_json_path + self.config = config or MergeConfig() + + # -------- core logic -------- + def _word_time_anchor(self, w: Dict[str, Any]) -> float: + if self.config.word_anchor == "start": + return float(w.get("start", 0.0)) + if self.config.word_anchor == "end": + return float(w.get("end", 0.0)) + s = float(w.get("start", 0.0)) + e = float(w.get("end", s)) + return 0.5 * (s + e) + + def _assign_words_to_segments( + self, + words: List[Dict[str, Any]], + segs: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + # segごとに words を詰める + assigned = [] + for seg in segs: + seg["words"] = [] + assigned.append(seg) + + # 事前に時間でソート + words_sorted = sorted(words, key=lambda w: self._word_time_anchor(w)) + segs_sorted = sorted(assigned, key=lambda s: (s["start"], s["end"])) + + si = 0 + nseg = len(segs_sorted) + for w in words_sorted: + t = self._word_time_anchor(w) + # 今のセグメント位置から前進しながら所属先を探す + while si < nseg and t > segs_sorted[si]["end"]: + si += 1 + # 過去のセグメントに入る可能性もあるので、1つ前もチェック + candidates = [] + if si < nseg: + candidates.append(segs_sorted[si]) + if si - 1 >= 0: + candidates.append(segs_sorted[si - 1]) + + chosen = None + for seg in candidates: + if seg["start"] <= t <= seg["end"]: + chosen:dict = seg + break + if chosen is not None: + words:List[Dict[str, Any]] = chosen["words"] + words.append(w) + # どこにも入らない単語はスキップ(VAD外のゴミ/誤検出) + + # テキストを単語から生成(日本語は無スペース、英数はスペース) + for seg in segs_sorted: + text = "" + for w in seg["words"]: + token = str(w.get("word", "")).strip() + if not token: + continue + text = _concat_token(text, token) + seg["text"] = text + + return segs_sorted + + def _join_adjacent_same_speaker( + self, segs: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + if not segs: + return [] + gap = float(self.config.same_speaker_join_gap) + out = [] + cur = dict(segs[0]) + cur["words"] = list(cur.get("words", [])) # copy + + for s in segs[1:]: + if ( + s["speaker"] == cur["speaker"] and + s["start"] - cur["end"] <= gap + ): + # マージ + cur["end"] = max(cur["end"], s["end"]) + cur["words"].extend(s.get("words", [])) + # テキスト再生成 + text = "" + words:List[Dict[str, Any]] = cur.get("words", []) + for w in words: + token = str(w.get("word", "")).strip() + if token: + text = _concat_token(text, token) + cur["text"] = text + else: + out.append(cur) + cur = dict(s) + cur["words"] = list(s.get("words", [])) + out.append(cur) + return out + + def _build_turns(self, segs_joined: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """スピーカータン(話者の連続ブロック)を作成。ここでは join 後のセグメントが既にTurn単位とみなし同一構造で返す""" + # 必要なら更に長い沈黙や改行ポリシーで段落化も可能 + return segs_joined + + # -------- I/O wrappers -------- + def _load_json(self, path: str) -> Dict[str, Any]: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + def _save_json(self, path: str, data: Dict[str, Any]) -> None: + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + # -------- public: pipeline entry -------- + def execute(self): + self.logger.info(f"{self.name} execute started") + + # 入力パスの決定(statusにあれば優先) + transcripts_path = self.transcripts_json_path or self.status.transcript_file + diar_path = self.diar_json_path or self.status.diarization_file + if not transcripts_path or not os.path.exists(transcripts_path): + raise FileNotFoundError(f"transcripts_json not found: {transcripts_path}") + if not diar_path or not os.path.exists(diar_path): + raise FileNotFoundError(f"diar_json not found: {diar_path}") + + tr = self._load_json(transcripts_path) + dj = self._load_json(diar_path) + + # 1チャンク前提で処理(複数チャンクならループで回す構造に拡張) + t0: Dict[str, Any] = tr["transcripts"][0] + chunk_id = t0.get("chunk_id") + abs_start = float(t0.get("abs_start", 0.0)) + abs_end = float(t0.get("abs_end", 0.0)) + words = t0.get("words", []) + segs = dj.get("segments", []) + + # 念のため整形 + segs_norm = [] + for s in segs: + segs_norm.append({ + "speaker": s["speaker"], + # diarの start/end は“元音源”基準。abs_startが0で無いなら必要に応じて補正 + "start": float(s["start"]), + "end": float(s["end"]), + }) + + # 1) 単語を各話者セグメントに割り当ててテキスト生成 + segs_assigned = self._assign_words_to_segments(words, segs_norm) + + # 2) 近接する同一話者セグメントを結合(Turn化) + segs_joined = self._join_adjacent_same_speaker(segs_assigned) + + # 3) 出力組み立て + out = { + "chunk_id": chunk_id, + "abs_start": abs_start, + "abs_end": abs_end, + "sample_rate": dj.get("sample_rate"), + "speakers": dj.get("speakers"), + "num_speakers": dj.get("num_speakers"), + "uri": dj.get("uri"), + "vad": dj.get("vad"), + "rttm": dj.get("rttm"), + "segments_raw": segs_assigned, # 結合前(元の細かいセグメント) + "turns": self._build_turns(segs_joined) # 結合後(読みやすい単位) + } + + # 出力パス + out_path = self.out_json_path or getattr(self.status, "merged_transcript_json_path", None) + if not out_path: + # デフォルトの保存先(transcripts.json と同じ場所に merged.json) + base_dir = os.path.dirname(transcripts_path) + out_path = os.path.join(base_dir, "merged_transcript_turns.json") + + self._save_json(out_path, out) + self.logger.info(f"Merged diarization+transcripts -> {out_path}") + return out diff --git a/src/jobs/job_transcribe_chk_modal.py b/src/jobs/job_transcribe_chk_modal.py index 8a69590..694ec6b 100644 --- a/src/jobs/job_transcribe_chk_modal.py +++ b/src/jobs/job_transcribe_chk_modal.py @@ -1,39 +1,36 @@ import os import json import time -import requests -from typing import Any, Dict, List, Optional +from typing import Dict, Any, Iterator, Optional, List from pathlib import Path +import modal from jobs.job_base import JobBase -class JobTranscribeChunkOpenAI(JobBase): +class JobTranscribeChunkModal(JobBase): """ - チャンク群をOpenAIで逐次文字起こしするジョブ - - - 入力: chunks.manifest.json (JobChunkFilesで作成) - - 出力: チャンクごとの *.transcript.json と、統合 all.transcripts.json - - タイムスタンプを abs_start で全体時刻に補正 + チャンク群をModalのfaster-whisperで文字起こしするジョブ """ def __init__(self): super().__init__(name=self.__class__.__name__) - self.description = "Transcribe Chunks via OpenAI" + self.description = "Transcribe Chunks via Modal (faster-whisper)" - # 設定(環境変数で上書き可能) - # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") - self.model = os.getenv("ASR_MODEL", "whisper-1") - self.lang = os.getenv("ASR_LANG", "ja") - self.response_format = os.getenv("ASR_RESP_FMT", "json") - self.word_ts = os.getenv("ASR_WORD_TS", "1") in ("1", "true", "True") - self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600")) + # Modal 関数指定(環境変数で上書き可能) + self.modal_app = os.getenv("MODAL_ASR_APP", "whisper-transcribe-fw") + self.modal_func = os.getenv("MODAL_ASR_FUNC", "transcribe_audio") + + # ASR 設定 + self.model_name = os.getenv("ASR_MODEL", None) # 空なら Modal 側のデフォルト + self.lang = os.getenv("ASR_LANG", "ja") + + # 実行ポリシー + self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600")) self.sleep_between = float(os.getenv("ASR_SLEEP_SEC", "0.3")) - self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2")) + self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2")) self.retry_backoff = float(os.getenv("ASR_RETRY_BACKOFF", "1.5")) # 乗算 - # OpenAI - self.api_key = os.getenv("OPENAI_API_KEY") - self.api_url = os.getenv("OPENAI_TRANSCRIBE_URL", "https://api.openai.com/v1/audio/transcriptions") + # ---------- パス ---------- def _manifest_path(self) -> Path: return Path(self.status.chunk_manifest) @@ -41,74 +38,54 @@ class JobTranscribeChunkOpenAI(JobBase): def _out_dir(self) -> Path: return Path(self.status.transcript_dir) + # ---------- Modal 呼び出し ---------- - def _transcribe_file(self, wav_path: str) -> Dict[str, Any]: - if not self.api_key: - raise RuntimeError("OPENAI_API_KEY が未設定です。環境変数に設定してください。") + def _transcribe_with_modal(self, wav_path: str) -> Dict[str, Any]: + """Modal の Function を直接呼び出す(bytes渡し)""" + fn = modal.Function.from_name(self.modal_app, self.modal_func) - if "transcribe" in self.model and self.response_format == "json": - response_format = "verbose_json" - elif "whisper" in self.model and self.response_format == "json": - response_format = "verbose_json" - else: - response_format = self.response_format - - # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") - self.model = os.getenv("ASR_MODEL", "whisper-1") - - headers = {"Authorization": f"Bearer {self.api_key}"} - data = { - "model": self.model, - "response_format": response_format, - "language": self.lang, - } - # 単語タイムスタンプが欲しければ指定 - if self.word_ts: - # OpenAIのフォームは配列っぽいキー名を要求 - data["timestamp_granularities[]"] = "word" + data = Path(wav_path).read_bytes() + kwargs = dict(filename=Path(wav_path).name, language=self.lang) + if self.model_name: + kwargs["model_name"] = self.model_name # 軽いリトライ wait = 1.0 last_err: Optional[Exception] = None for attempt in range(self.max_retries + 1): try: - with open(wav_path, "rb") as f: - files = {"file": (Path(wav_path).name, f, "audio/wav")} - r = requests.post(self.api_url, headers=headers, data=data, files=files, timeout=self.req_timeout) - if r.status_code == 429 or 500 <= r.status_code < 600: - raise requests.HTTPError(f"HTTP {r.status_code}: {r.text}") - r.raise_for_status() - return r.json() + # NOTE: Modal の .remote は同期戻り + res = fn.remote(data, **kwargs) + # 返り値は {"text","segments","words",...} を想定 + return res except Exception as e: last_err = e if attempt < self.max_retries: - self.logger.warning(f"ASR retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}") + self.logger.warning(f"Modal retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}") time.sleep(wait) wait *= self.retry_backoff continue break - # リトライ尽きた - raise last_err if last_err else RuntimeError("Unknown ASR error") + raise last_err if last_err else RuntimeError("Unknown Modal ASR error") + # ---------- 実行本体 ---------- def execute(self): self.logger.info(f"{self.name} execute started") - if os.path.exists(self.status.transcript_dir): - # すでに変換済み - self.logger.info(f"Transcription already done: {self.status.transcript_dir}") - return - manifest_path = self._manifest_path() if not manifest_path.exists(): raise FileNotFoundError(f"chunks manifest not found: {manifest_path}") out_dir = self._out_dir() + if out_dir.exists(): + # すでに変換済み + self.logger.info(f"Transcription already done: {out_dir}") + return out_dir.mkdir(parents=True, exist_ok=True) meta = json.loads(manifest_path.read_text(encoding="utf-8")) chunks: List[Dict[str, Any]] = meta.get("chunks") or [] - if not chunks: self.logger.warning("No chunks found in manifest. Skipping.") return @@ -119,41 +96,36 @@ class JobTranscribeChunkOpenAI(JobBase): wav = ch["path"] per_chunk_out = out_dir / f"{Path(wav).stem}.transcript.json" - # レジューム対応:既に保存済みならスキップ + # レジューム:保存済みならスキップして読み出し if per_chunk_out.exists(): res: dict = json.loads(per_chunk_out.read_text(encoding="utf-8")) else: - self.logger.info(f"ASR: {wav}") - res: dict = self._transcribe_file(wav) + self.logger.info(f"ASR(modal): {wav}") + res: dict = self._transcribe_with_modal(wav) per_chunk_out.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8") time.sleep(self.sleep_between) - # チャンク内→全体時刻に補正 - offset = float(ch["abs_start"]) + # abs_start 分だけ全体時刻に補正 + offset = float(ch.get("abs_start", 0.0)) words = res.get("words") or [] for w in words: - if "start" in w: - w["start"] = float(w["start"]) + offset - if "end" in w: - w["end"] = float(w["end"]) + offset + if "start" in w: w["start"] = float(w["start"]) + offset + if "end" in w: w["end"] = float(w["end"]) + offset segments = res.get("segments") or [] for s in segments: - if "start" in s: - s["start"] = float(s["start"]) + offset - if "end" in s: - s["end"] = float(s["end"]) + offset + if "start" in s: s["start"] = float(s["start"]) + offset + if "end" in s: s["end"] = float(s["end"]) + offset results.append({ "chunk_id": ch["chunk_id"], - "abs_start": ch["abs_start"], - "abs_end": ch["abs_end"], - "text": res.get("text", ""), - "segments": segments, - "words": words, + "abs_start": ch.get("abs_start", 0.0), + "abs_end": ch.get("abs_end", 0.0), + "text": res.get("text", ""), + "segments": segments, + "words": words, }) - # 統合保存 + # 統合保存(OpenAI版と同じ形) all_out = out_dir / "all.transcripts.json" all_out.write_text(json.dumps({"transcripts": results}, ensure_ascii=False, indent=2), encoding="utf-8") - self.logger.info(f"Transcription merged: {all_out}") - + self.logger.info(f"Transcription merged: {all_out}") \ No newline at end of file