From 3ab068ea9f646668a6f2335ada5390d5f904e01d Mon Sep 17 00:00:00 2001 From: "ry.yamafuji" Date: Thu, 11 Sep 2025 18:07:45 +0900 Subject: [PATCH] =?UTF-8?q?pipe=E5=87=A6=E7=90=86=E3=82=92=E7=94=9F?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 7 +- src/app_status.py | 53 +++++++ src/jobs/job_chunk_with_diarization.py | 195 +++++++++++++++++++++++++ src/jobs/job_diarization.py | 60 ++++++++ src/jobs/job_enhancement_rnnoise.py | 62 ++++++++ src/jobs/job_pre_denoizing_fmmpeg.py | 46 ++++++ src/jobs/job_pre_normalize_fmmpeg.py | 43 ++++++ src/jobs/job_transcribe_chk_modal.py | 159 ++++++++++++++++++++ src/jobs/job_transcribe_chk_openai.py | 159 ++++++++++++++++++++ src/jobs/job_visualize_audio.py | 12 +- src/main.py | 3 + src/pipeline/app_pipeline.py | 25 +++- src/pipeline/pipeline_base.py | 2 + 13 files changed, 817 insertions(+), 9 deletions(-) create mode 100644 src/jobs/job_chunk_with_diarization.py create mode 100644 src/jobs/job_diarization.py create mode 100644 src/jobs/job_enhancement_rnnoise.py create mode 100644 src/jobs/job_pre_denoizing_fmmpeg.py create mode 100644 src/jobs/job_pre_normalize_fmmpeg.py create mode 100644 src/jobs/job_transcribe_chk_modal.py create mode 100644 src/jobs/job_transcribe_chk_openai.py diff --git a/requirements.txt b/requirements.txt index 6c531c2..b5c1fbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,6 @@ -librosalibrosa \ No newline at end of file +librosalibrosa +pyrnnoise +resampy +soundfile +numpy +modal \ No newline at end of file diff --git a/src/app_status.py b/src/app_status.py index 6010239..882098c 100644 --- a/src/app_status.py +++ b/src/app_status.py @@ -61,12 +61,59 @@ class AppStatus(Singleton): source_dir = f"{self.output_base_dir}/{self.request_id}/source" return source_dir + @property + def preprocess_dir(self)-> str: + """前処理ディレクトリのパス""" + preprocess_dir = f"{self.output_base_dir}/{self.request_id}/preprocess" + return preprocess_dir + + @property + def preprocess_index(self)-> int: + """前処理のインデックス""" + return self.get_status('preprocess_index', default=0) + + @preprocess_index.setter + def preprocess_index(self, value:int): + """前処理のインデックス""" + self.set_status('preprocess_index', value) + if value == 0: + self.set_status('preprocess_inputfile', self.unified_file) + + @property + def preprocess_inputfile(self)-> str: + """前処理の入力ファイルのパス""" + return self.get_status('preprocess_inputfile') + + @preprocess_inputfile.setter + def preprocess_inputfile(self, value:str): + """前処理の入力ファイルのパス""" + self.set_status('preprocess_inputfile', value) + + @property + def preprocess_outputfile(self)-> str: + """前処理の出力ファイルのパス""" + index = self.preprocess_index + filename = f"preprocess_{index:02d}.wav" + output_file = f"{self.preprocess_dir}/{filename}" + return output_file + + @property def chunk_dir(self)-> str: """チャンクディレクトリのパス""" chunk_dir = f"{self.output_base_dir}/{self.request_id}/chunk" return chunk_dir + @property + def chunk_manifest(self)-> str: + """チャンクマニフェストのパス""" + return f"{self.chunk_dir}/chunks.manifest.json" + + @property + def transcript_dir(self)-> str: + """文字起こし結果ディレクトリのパス""" + transcript_dir = f"{self.output_base_dir}/{self.request_id}/transcripts" + return transcript_dir @property @@ -83,3 +130,9 @@ class AppStatus(Singleton): def unified_file(self)-> str: """統一ファイルのパス""" return f"{self.output_dir}/unified.wav" + + @property + def diarization_file(self)-> str: + """話者分離のJSONファイルのパス""" + return f"{self.output_dir}/diarization.json" + diff --git a/src/jobs/job_chunk_with_diarization.py b/src/jobs/job_chunk_with_diarization.py new file mode 100644 index 0000000..ea13805 --- /dev/null +++ b/src/jobs/job_chunk_with_diarization.py @@ -0,0 +1,195 @@ +# jobs/job_chunk_files.py +import os +import json +import subprocess +from pathlib import Path +from jobs.job_base import JobBase + + +class JobChunkWithDiarization(JobBase): + """音声ファイルをチャンクに分割するジョブ + * 7分ターゲット(CHUNK_TARGET_SEC=420)で粗く位置決め → ±5秒の範囲で無音にスナップ + * 0.4秒オーバーラップ(CHUNK_OVERLAP_SEC=0.4) + * 最短1分(CHUNK_MIN_LEN_SEC=60) + * chunks.manifest.jsonl / chunks.manifest.json を保存して、絶対時刻(abs_start/abs_end)と切り出し実時間(cut_start/cut_end) を持たせます。 + * 後段の ASR 結果(チャンク内時刻)に abs_start を足すだけで全体時刻に戻せます。 + + """ + + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Chunk Audio Files Job" + + # デフォルト設定(環境変数で上書き可) + self.target_len = float(os.getenv("CHUNK_TARGET_SEC", "420")) # 7分 + self.overlap = float(os.getenv("CHUNK_OVERLAP_SEC", "0.4")) # 0.4s + self.snap_win = float(os.getenv("CHUNK_SNAP_WINDOW_SEC", "5.0")) # 境界±5sで無音にスナップ + self.min_sil = float(os.getenv("CHUNK_MIN_SILENCE_SEC", "0.25")) # 無音とみなす最小長 + self.min_len = float(os.getenv("CHUNK_MIN_LEN_SEC", "60")) # 最短1分 + self.max_len = float(os.getenv("CHUNK_MAX_LEN_SEC", "480")) # 最長8分(fallback) + + def _ffprobe_duration(self, src: str) -> float: + out = subprocess.check_output( + ["ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", src], + text=True + ).strip() + return float(out) + + def _invert_vad_to_silences(self, vad: list[dict], total: float) -> list[tuple[float, float]]: + """VAD区間(発話)→ サイレンス区間に反転して [ (s,e), ... ] を返す""" + if not vad: + return [(0.0, total)] + vad_sorted = sorted([(float(v["start"]), float(v["end"])) for v in vad if v["end"] > v["start"]], key=lambda x: x[0]) + + silences = [] + t = 0.0 + for s, e in vad_sorted: + if s > t: + silences.append((t, s)) + t = max(t, e) + if t < total: + silences.append((t, total)) + + # 短すぎるサイレンスは無視 + silences = [(s, e) for (s, e) in silences if (e - s) >= self.min_sil] + return silences + + def _nearest_silence_boundary(self, target_t: float, silences: list[tuple[float, float]], window: float) -> float | None: + """target_t の±window 内にあるサイレンスの “中点” または端点にスナップ(中点優先)""" + cand = [] + lo, hi = target_t - window, target_t + window + for s, e in silences: + mid = 0.5 * (s + e) + for t in (mid, s, e): # mid優先で評価 + if lo <= t <= hi: + cand.append((abs(t - target_t), t)) + if not cand: + return None + cand.sort(key=lambda x: x[0]) + return cand[0][1] + + def _cut_wav(self, src: str, dst: str, start: float, end: float): + Path(dst).parent.mkdir(parents=True, exist_ok=True) + dur = max(0.01, end - start) + # ここで ASR 向けに 16k/mono PCM16 へ正規化出力 + cmd = [ + "ffmpeg", "-y", + "-i", src, + "-ss", f"{start:.3f}", + "-t", f"{dur:.3f}", + "-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le", + dst + ] + subprocess.run(cmd, check=True) + + def _build_chunks_with_vad_snap(self, src: str, out_dir: str, diar_json_path: str): + out = Path(out_dir) + out.mkdir(parents=True, exist_ok=True) + + total = self._ffprobe_duration(src) + + # ダイアリゼーション(VAD)読み込み + diar: dict = json.loads(Path(diar_json_path).read_text(encoding="utf-8")) + vad_list = diar.get("vad") or [] + silences = self._invert_vad_to_silences(vad_list, total) + + chunks = [] + manifest = [] + i = 0 + cur = 0.0 + + # 先頭チャンクの開始は0固定(頭に十分な無音があっても0開始にする) + while cur < total: + # 目標終端 + target_end = cur + self.target_len + + # 上限/下限ガード + hard_max = min(cur + self.max_len, total) + hard_min = min(cur + self.min_len, total) + + # スナップ対象の目標(total超えるなら total) + soft_target = min(target_end, total) + + # 近傍の無音にスナップ + snapped_end = self._nearest_silence_boundary(soft_target, silences, self.snap_win) + + # 適用条件: + # - スナップ後が hard_min 以上 + # - かつ hard_max 以下 + if snapped_end is not None: + end = max(hard_min, min(snapped_end, hard_max)) + else: + # 無音が見つからなければ、hard_max を超えない範囲で soft_target を採用 + end = max(hard_min, min(soft_target, hard_max)) + + # オーバーラップ付与(右にだけ与える設計。左は前チャンクが担保) + cut_start = max(0.0, cur - self.overlap) + cut_end = min(total, end + self.overlap) + + dst = out / f"{i:06d}.wav" + self._cut_wav(src, str(dst), cut_start, cut_end) + + chunks.append({ + "chunk_id": f"{i:06d}", + "abs_start": round(cur, 3), + "abs_end": round(end, 3), + "cut_start": round(cut_start, 3), + "cut_end": round(cut_end, 3), + "path": str(dst), + }) + manifest.append({ + "chunk_id": f"{i:06d}", + "abs_start": round(cur, 3), + "abs_end": round(end, 3), + "overlap_left": self.overlap if cur > 0.0 else 0.0, + "overlap_right": self.overlap if end < total else 0.0, + "path": str(dst), + }) + + i += 1 + cur = end # 次の開始は今回の終端 + + # マニフェスト保存(チャンクディレクトリ直下に) + (out / "chunks.manifest.jsonl").write_text( + "\n".join(json.dumps(m, ensure_ascii=False) for m in manifest), + encoding="utf-8" + ) + + # ついでに JSON でも + (out / "chunks.manifest.json").write_text( + json.dumps({"duration": total, "chunks": chunks}, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + return {"duration": total, "chunks": chunks} + + def _has_existing_chunks(self, chunk_dir: str) -> bool: + p = Path(chunk_dir) + return p.exists() and any(p.glob("*.wav")) + + def execute(self): + self.logger.info(f"{self.name} execute started") + + # 既にチャンク済か軽く判定(ディレクトリ存在だけでなく *.wav があるか) + if self._has_existing_chunks(self.status.chunk_dir): + self.logger.info(f"Chunks already exist: {self.status.chunk_dir}") + return + + # 入力音声(標準化/前処理後のファイルを想定) + src = self.status.unified_file # ここはあなたのパイプラインに合わせて + diar_json = self.status.diarization_file # 直前のジョブが保存した JSON + dst_dir = self.status.chunk_dir + + if not src or not Path(src).exists(): + raise FileNotFoundError(f"Input audio not found: {src}") + if not diar_json or not Path(diar_json).exists(): + raise FileNotFoundError(f"Diarization JSON not found: {diar_json}") + + self.logger.info(f"Chunk target ~{self.target_len}s, overlap {self.overlap}s, snap ±{self.snap_win}s, min_sil {self.min_sil}s") + meta = self._build_chunks_with_vad_snap(src, dst_dir, diar_json) + + # 後工程用に状態に積む + self.status.set_status("chunks_manifest", str(Path(dst_dir) / "chunks.manifest.json")) + self.logger.info(f"Chunking done: {len(meta['chunks'])} files -> {dst_dir}") + + return \ No newline at end of file diff --git a/src/jobs/job_diarization.py b/src/jobs/job_diarization.py new file mode 100644 index 0000000..9d85848 --- /dev/null +++ b/src/jobs/job_diarization.py @@ -0,0 +1,60 @@ +import os +import json +import time +from pathlib import Path +import modal +from jobs.job_base import JobBase + +class JobDiarization(JobBase): + """話者分離を行うジョブ (pyannote.audio)""" + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Diarization Job" + # 環境変数で上書き可能に + self.modal_app_name = os.getenv("MODAL_DIAR_APP", "speaker-diarization-pyannote") + self.modal_func_name = os.getenv("MODAL_DIAR_FUNC", "diarize_audio") + # リトライ設定(ネットワーク/一時エラー対策) + self.max_retries = int(os.getenv("MODAL_DIAR_RETRIES", "2")) + self.retry_wait = float(os.getenv("MODAL_DIAR_RETRY_WAIT", "2.0")) # 秒 + + def _call_modal(self, audio_bytes: bytes, filename: str) -> dict: + """Modal の diarize 関数を呼ぶ(軽いリトライ付き)""" + func = modal.Function.from_name(self.modal_app_name, self.modal_func_name) + last_exc = None + for attempt in range(self.max_retries + 1): + try: + return func.remote(audio_bytes, filename=filename) + except Exception as e: + last_exc = e + if attempt < self.max_retries: + time.sleep(self.retry_wait * (attempt + 1)) + else: + raise + # 通らないが型のため + raise last_exc + + + def execute(self): + self.logger.info(f"{self.name} execute started") + + # 出力先ファイルパス + diar_json_path = Path(self.status.diarization_file) + + if diar_json_path.exists(): + # すでに話者分離済み + self.logger.info(f"Diarization already done: {self.status.diarization_file}") + return + + audio_path = Path(self.status.preprocess_inputfile) + audio_bytes = audio_path.read_bytes() + + # Modalで話者分離を実行 + self.logger.info(f"Calling Modal: app={self.modal_app_name}, func={self.modal_func_name}, file={audio_path.name}") + res = self._call_modal(audio_bytes, filename=audio_path.name) + + # JSON 保存 + diar_json_path.parent.mkdir(parents=True, exist_ok=True) + diar_json_path.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8") + + self.logger.info(f"Diarization result saved: {diar_json_path}") + return \ No newline at end of file diff --git a/src/jobs/job_enhancement_rnnoise.py b/src/jobs/job_enhancement_rnnoise.py new file mode 100644 index 0000000..3b3f4f8 --- /dev/null +++ b/src/jobs/job_enhancement_rnnoise.py @@ -0,0 +1,62 @@ +import os +import soundfile as sf +import numpy as np +from pathlib import Path +from pyrnnoise import RNNoise +from jobs.job_base import JobBase + + +class JobEnhancementRnnoise(JobBase): + """音声ファイルの音声強調(RNNoise)を行うジョブ""" + + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Enhancement RNNoise Fmmpeg Job" + + def _enhance_rnnoise(self, src: str, dst: str): + # 入力の実サンプルレートを取得して RNNoise に渡す + info = sf.info(src) + rn = RNNoise(sample_rate=info.samplerate) + + tmp = str(Path(dst).with_suffix(".rnnoise.tmp.wav")) + try: + # ファイル→ファイルでRNNoiseを適用(内部で48k変換→戻しをやってくれる) + for _ in rn.denoise_wav(src, tmp): + pass + finally: + rn.reset() + del rn + + # 任意: -3 dBFS の軽いピーク正規化 + y, sr = sf.read(tmp, dtype="float32", always_2d=False) + peak = float(np.max(np.abs(y)) + 1e-12) + if peak > 0: + target_lin = 10 ** (-3 / 20) # ≈ 0.707 + y *= min(1.0, target_lin / peak) + + sf.write(dst, y, sr, subtype="PCM_16") + try: + os.remove(tmp) + except OSError: + pass + + def execute(self): + self.logger.info(f"{self.name} execute started") + + if not os.path.exists(self.status.preprocess_dir): + self.status.preprocess_index = 0 + elif os.path.exists(self.status.preprocess_outputfile): + # preprocess_dirが存在する場合かつ、 preprocess_outputfileが存在する場合 + self.logger.info(f"Preprocess output file already exists: {self.status.preprocess_outputfile}") + self.status.preprocess_inputfile = self.status.preprocess_outputfile + self.status.preprocess_index += 1 + return + + + input_file = self.status.preprocess_inputfile + output_file = self.status.preprocess_outputfile + self._enhance_rnnoise(input_file, output_file) + + self.status.preprocess_inputfile = output_file + self.status.preprocess_index += 1 + return diff --git a/src/jobs/job_pre_denoizing_fmmpeg.py b/src/jobs/job_pre_denoizing_fmmpeg.py new file mode 100644 index 0000000..0309396 --- /dev/null +++ b/src/jobs/job_pre_denoizing_fmmpeg.py @@ -0,0 +1,46 @@ +import os +from jobs.job_base import JobBase + +class JobPreDenoizingFmmpeg(JobBase): + + """音声ファイルの前処理(ノイズ除去)を行うジョブ""" + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Pre Denoizing Fmmpeg Job" + + def _denoise_ffmpeg(self, src, dst): + import subprocess, pathlib + pathlib.Path(dst).parent.mkdir(parents=True, exist_ok=True) + cmd = [ + "ffmpeg", "-y", "-i", src, + "-af", ( + "highpass=f=60," # 低域カット + "lowpass=f=9000," # 高域ノイズ軽減 + "afftdn=nr=12:nf=-25," # 軽いノイズ除去(FFTベース) + "dynaudnorm=p=0.5" # 軽い音量均し(強めたい場合は:m=15:s=3) + ), + "-ar", "16000", "-ac", "1", # STT用に16kHz/モノラル変換 + dst + ] + subprocess.run(cmd, check=True) + + + def execute(self): + self.logger.info(f"{self.name} execute started") + + if not os.path.exists(self.status.preprocess_dir): + self.status.preprocess_index = 0 + elif os.path.exists(self.status.preprocess_outputfile): + # preprocess_dirが存在する場合かつ、 preprocess_outputfileが存在する場合 + self.logger.info(f"Preprocess output file already exists: {self.status.preprocess_outputfile}") + self.status.preprocess_inputfile = self.status.preprocess_outputfile + self.status.preprocess_index += 1 + return + + input_file = self.status.preprocess_inputfile + output_file = self.status.preprocess_outputfile + self._denoise_ffmpeg(input_file, output_file) + + self.status.preprocess_inputfile = output_file + self.status.preprocess_index += 1 + return diff --git a/src/jobs/job_pre_normalize_fmmpeg.py b/src/jobs/job_pre_normalize_fmmpeg.py new file mode 100644 index 0000000..eb87967 --- /dev/null +++ b/src/jobs/job_pre_normalize_fmmpeg.py @@ -0,0 +1,43 @@ +import os +from jobs.job_base import JobBase + +class JobPreNormalizingFmmpeg(JobBase): + """音声ファイルの前処理(ラウドネス正規化)を行うジョブ""" + + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Pre Normalizing Fmmpeg Job" + + def _normalize_ffmpeg(self, src, dst): + import subprocess, pathlib + pathlib.Path(dst).parent.mkdir(parents=True, exist_ok=True) + # ffmpeg -i input.wav -af "loudnorm=I=-23:TP=-2:LRA=11" -ar 16000 -ac 1 output_norm.wav + cmd = [ + "ffmpeg", "-y", "-i", src, + "-af", "loudnorm=I=-23:TP=-2:LRA=11", + "-ar", "16000", "-ac", "1", dst + ] + subprocess.run(cmd, check=True) + + + def execute(self): + self.logger.info(f"{self.name} execute started") + + if not os.path.exists(self.status.preprocess_dir): + self.status.preprocess_index = 0 + elif os.path.exists(self.status.preprocess_outputfile): + # preprocess_dirが存在する場合かつ、 preprocess_outputfileが存在する場合 + self.logger.info(f"Preprocess output file already exists: {self.status.preprocess_outputfile}") + self.status.preprocess_inputfile = self.status.preprocess_outputfile + self.status.preprocess_index += 1 + return + + + + input_file = self.status.preprocess_inputfile + output_file = self.status.preprocess_outputfile + self._normalize_ffmpeg(input_file, output_file) + + self.status.preprocess_inputfile = output_file + self.status.preprocess_index += 1 + return diff --git a/src/jobs/job_transcribe_chk_modal.py b/src/jobs/job_transcribe_chk_modal.py new file mode 100644 index 0000000..8a69590 --- /dev/null +++ b/src/jobs/job_transcribe_chk_modal.py @@ -0,0 +1,159 @@ +import os +import json +import time +import requests +from typing import Any, Dict, List, Optional +from pathlib import Path +from jobs.job_base import JobBase + +class JobTranscribeChunkOpenAI(JobBase): + """ + チャンク群をOpenAIで逐次文字起こしするジョブ + + - 入力: chunks.manifest.json (JobChunkFilesで作成) + - 出力: チャンクごとの *.transcript.json と、統合 all.transcripts.json + - タイムスタンプを abs_start で全体時刻に補正 + """ + + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Transcribe Chunks via OpenAI" + + # 設定(環境変数で上書き可能) + # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") + self.model = os.getenv("ASR_MODEL", "whisper-1") + self.lang = os.getenv("ASR_LANG", "ja") + self.response_format = os.getenv("ASR_RESP_FMT", "json") + self.word_ts = os.getenv("ASR_WORD_TS", "1") in ("1", "true", "True") + self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600")) + self.sleep_between = float(os.getenv("ASR_SLEEP_SEC", "0.3")) + self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2")) + self.retry_backoff = float(os.getenv("ASR_RETRY_BACKOFF", "1.5")) # 乗算 + + # OpenAI + self.api_key = os.getenv("OPENAI_API_KEY") + self.api_url = os.getenv("OPENAI_TRANSCRIBE_URL", "https://api.openai.com/v1/audio/transcriptions") + + + def _manifest_path(self) -> Path: + return Path(self.status.chunk_manifest) + + def _out_dir(self) -> Path: + return Path(self.status.transcript_dir) + + + def _transcribe_file(self, wav_path: str) -> Dict[str, Any]: + if not self.api_key: + raise RuntimeError("OPENAI_API_KEY が未設定です。環境変数に設定してください。") + + if "transcribe" in self.model and self.response_format == "json": + response_format = "verbose_json" + elif "whisper" in self.model and self.response_format == "json": + response_format = "verbose_json" + else: + response_format = self.response_format + + # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") + self.model = os.getenv("ASR_MODEL", "whisper-1") + + headers = {"Authorization": f"Bearer {self.api_key}"} + data = { + "model": self.model, + "response_format": response_format, + "language": self.lang, + } + # 単語タイムスタンプが欲しければ指定 + if self.word_ts: + # OpenAIのフォームは配列っぽいキー名を要求 + data["timestamp_granularities[]"] = "word" + + # 軽いリトライ + wait = 1.0 + last_err: Optional[Exception] = None + for attempt in range(self.max_retries + 1): + try: + with open(wav_path, "rb") as f: + files = {"file": (Path(wav_path).name, f, "audio/wav")} + r = requests.post(self.api_url, headers=headers, data=data, files=files, timeout=self.req_timeout) + if r.status_code == 429 or 500 <= r.status_code < 600: + raise requests.HTTPError(f"HTTP {r.status_code}: {r.text}") + r.raise_for_status() + return r.json() + except Exception as e: + last_err = e + if attempt < self.max_retries: + self.logger.warning(f"ASR retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}") + time.sleep(wait) + wait *= self.retry_backoff + continue + break + # リトライ尽きた + raise last_err if last_err else RuntimeError("Unknown ASR error") + + + def execute(self): + self.logger.info(f"{self.name} execute started") + + if os.path.exists(self.status.transcript_dir): + # すでに変換済み + self.logger.info(f"Transcription already done: {self.status.transcript_dir}") + return + + manifest_path = self._manifest_path() + if not manifest_path.exists(): + raise FileNotFoundError(f"chunks manifest not found: {manifest_path}") + + out_dir = self._out_dir() + out_dir.mkdir(parents=True, exist_ok=True) + + meta = json.loads(manifest_path.read_text(encoding="utf-8")) + chunks: List[Dict[str, Any]] = meta.get("chunks") or [] + + if not chunks: + self.logger.warning("No chunks found in manifest. Skipping.") + return + + results: List[Dict[str, Any]] = [] + + for ch in chunks: + wav = ch["path"] + per_chunk_out = out_dir / f"{Path(wav).stem}.transcript.json" + + # レジューム対応:既に保存済みならスキップ + if per_chunk_out.exists(): + res: dict = json.loads(per_chunk_out.read_text(encoding="utf-8")) + else: + self.logger.info(f"ASR: {wav}") + res: dict = self._transcribe_file(wav) + per_chunk_out.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8") + time.sleep(self.sleep_between) + + # チャンク内→全体時刻に補正 + offset = float(ch["abs_start"]) + words = res.get("words") or [] + for w in words: + if "start" in w: + w["start"] = float(w["start"]) + offset + if "end" in w: + w["end"] = float(w["end"]) + offset + segments = res.get("segments") or [] + for s in segments: + if "start" in s: + s["start"] = float(s["start"]) + offset + if "end" in s: + s["end"] = float(s["end"]) + offset + + results.append({ + "chunk_id": ch["chunk_id"], + "abs_start": ch["abs_start"], + "abs_end": ch["abs_end"], + "text": res.get("text", ""), + "segments": segments, + "words": words, + }) + + # 統合保存 + all_out = out_dir / "all.transcripts.json" + all_out.write_text(json.dumps({"transcripts": results}, ensure_ascii=False, indent=2), encoding="utf-8") + self.logger.info(f"Transcription merged: {all_out}") + diff --git a/src/jobs/job_transcribe_chk_openai.py b/src/jobs/job_transcribe_chk_openai.py new file mode 100644 index 0000000..8a69590 --- /dev/null +++ b/src/jobs/job_transcribe_chk_openai.py @@ -0,0 +1,159 @@ +import os +import json +import time +import requests +from typing import Any, Dict, List, Optional +from pathlib import Path +from jobs.job_base import JobBase + +class JobTranscribeChunkOpenAI(JobBase): + """ + チャンク群をOpenAIで逐次文字起こしするジョブ + + - 入力: chunks.manifest.json (JobChunkFilesで作成) + - 出力: チャンクごとの *.transcript.json と、統合 all.transcripts.json + - タイムスタンプを abs_start で全体時刻に補正 + """ + + def __init__(self): + super().__init__(name=self.__class__.__name__) + self.description = "Transcribe Chunks via OpenAI" + + # 設定(環境変数で上書き可能) + # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") + self.model = os.getenv("ASR_MODEL", "whisper-1") + self.lang = os.getenv("ASR_LANG", "ja") + self.response_format = os.getenv("ASR_RESP_FMT", "json") + self.word_ts = os.getenv("ASR_WORD_TS", "1") in ("1", "true", "True") + self.req_timeout = int(os.getenv("ASR_TIMEOUT_SEC", "600")) + self.sleep_between = float(os.getenv("ASR_SLEEP_SEC", "0.3")) + self.max_retries = int(os.getenv("ASR_MAX_RETRIES", "2")) + self.retry_backoff = float(os.getenv("ASR_RETRY_BACKOFF", "1.5")) # 乗算 + + # OpenAI + self.api_key = os.getenv("OPENAI_API_KEY") + self.api_url = os.getenv("OPENAI_TRANSCRIBE_URL", "https://api.openai.com/v1/audio/transcriptions") + + + def _manifest_path(self) -> Path: + return Path(self.status.chunk_manifest) + + def _out_dir(self) -> Path: + return Path(self.status.transcript_dir) + + + def _transcribe_file(self, wav_path: str) -> Dict[str, Any]: + if not self.api_key: + raise RuntimeError("OPENAI_API_KEY が未設定です。環境変数に設定してください。") + + if "transcribe" in self.model and self.response_format == "json": + response_format = "verbose_json" + elif "whisper" in self.model and self.response_format == "json": + response_format = "verbose_json" + else: + response_format = self.response_format + + # self.model = os.getenv("ASR_MODEL", "gpt-4o-transcribe") + self.model = os.getenv("ASR_MODEL", "whisper-1") + + headers = {"Authorization": f"Bearer {self.api_key}"} + data = { + "model": self.model, + "response_format": response_format, + "language": self.lang, + } + # 単語タイムスタンプが欲しければ指定 + if self.word_ts: + # OpenAIのフォームは配列っぽいキー名を要求 + data["timestamp_granularities[]"] = "word" + + # 軽いリトライ + wait = 1.0 + last_err: Optional[Exception] = None + for attempt in range(self.max_retries + 1): + try: + with open(wav_path, "rb") as f: + files = {"file": (Path(wav_path).name, f, "audio/wav")} + r = requests.post(self.api_url, headers=headers, data=data, files=files, timeout=self.req_timeout) + if r.status_code == 429 or 500 <= r.status_code < 600: + raise requests.HTTPError(f"HTTP {r.status_code}: {r.text}") + r.raise_for_status() + return r.json() + except Exception as e: + last_err = e + if attempt < self.max_retries: + self.logger.warning(f"ASR retry ({attempt+1}/{self.max_retries}) for {wav_path}: {e}") + time.sleep(wait) + wait *= self.retry_backoff + continue + break + # リトライ尽きた + raise last_err if last_err else RuntimeError("Unknown ASR error") + + + def execute(self): + self.logger.info(f"{self.name} execute started") + + if os.path.exists(self.status.transcript_dir): + # すでに変換済み + self.logger.info(f"Transcription already done: {self.status.transcript_dir}") + return + + manifest_path = self._manifest_path() + if not manifest_path.exists(): + raise FileNotFoundError(f"chunks manifest not found: {manifest_path}") + + out_dir = self._out_dir() + out_dir.mkdir(parents=True, exist_ok=True) + + meta = json.loads(manifest_path.read_text(encoding="utf-8")) + chunks: List[Dict[str, Any]] = meta.get("chunks") or [] + + if not chunks: + self.logger.warning("No chunks found in manifest. Skipping.") + return + + results: List[Dict[str, Any]] = [] + + for ch in chunks: + wav = ch["path"] + per_chunk_out = out_dir / f"{Path(wav).stem}.transcript.json" + + # レジューム対応:既に保存済みならスキップ + if per_chunk_out.exists(): + res: dict = json.loads(per_chunk_out.read_text(encoding="utf-8")) + else: + self.logger.info(f"ASR: {wav}") + res: dict = self._transcribe_file(wav) + per_chunk_out.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding="utf-8") + time.sleep(self.sleep_between) + + # チャンク内→全体時刻に補正 + offset = float(ch["abs_start"]) + words = res.get("words") or [] + for w in words: + if "start" in w: + w["start"] = float(w["start"]) + offset + if "end" in w: + w["end"] = float(w["end"]) + offset + segments = res.get("segments") or [] + for s in segments: + if "start" in s: + s["start"] = float(s["start"]) + offset + if "end" in s: + s["end"] = float(s["end"]) + offset + + results.append({ + "chunk_id": ch["chunk_id"], + "abs_start": ch["abs_start"], + "abs_end": ch["abs_end"], + "text": res.get("text", ""), + "segments": segments, + "words": words, + }) + + # 統合保存 + all_out = out_dir / "all.transcripts.json" + all_out.write_text(json.dumps({"transcripts": results}, ensure_ascii=False, indent=2), encoding="utf-8") + self.logger.info(f"Transcription merged: {all_out}") + diff --git a/src/jobs/job_visualize_audio.py b/src/jobs/job_visualize_audio.py index 74d57fb..8ba1195 100644 --- a/src/jobs/job_visualize_audio.py +++ b/src/jobs/job_visualize_audio.py @@ -15,9 +15,11 @@ class JobVisualizeAudio(JobBase): """ 音声の波形とスペクトログラムを可視化するジョブ (CPU) """ - def __init__(self): + def __init__(self,input_file=None,dir_name="unified"): super().__init__(name=self.__class__.__name__) self.description = "Visualize Audio (waveform & spectrogram)" + self.dir_name = dir_name + self.input_file = input_file if input_file else self.status.unified_file def get_visualization(self, audio_path: str, out_dir: str, n_fft: int = 1024, hop_length: int = 256): @@ -72,11 +74,11 @@ class JobVisualizeAudio(JobBase): def execute(self): self.logger.info(f"{self.name} execute started") - if os.path.exists(f"{self.status.output_dir}/unified"): + if os.path.exists(f"{self.status.output_dir}/{self.dir_name}"): # すでに可視化済み - self.logger.info(f"Visualization already done: {self.status.output_dir}/unified") + self.logger.info(f"Visualization already done: {self.status.output_dir}/{self.dir_name}") return - audio_path = self.status.unified_file - self.get_visualization(audio_path, f"{self.status.output_dir}/unified") + audio_path = self.input_file + self.get_visualization(audio_path, f"{self.status.output_dir}/{self.dir_name}") return diff --git a/src/main.py b/src/main.py index 8168699..e4b37ac 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,9 @@ from lib.custom_logger import get_logger from app_status import AppStatus from app import app_start +from dotenv import load_dotenv +load_dotenv() # .envファイルの内容を環境変数に読み + def main(): parser = argparse.ArgumentParser(description="Speech to Text Pipeline") parser.add_argument("filepath", type=str, help="Path to the audio file") diff --git a/src/pipeline/app_pipeline.py b/src/pipeline/app_pipeline.py index 806fbf0..80b3260 100644 --- a/src/pipeline/app_pipeline.py +++ b/src/pipeline/app_pipeline.py @@ -3,7 +3,13 @@ from jobs.job_get_request_id import JobGetRequestId from jobs.job_set_soruce_file import JobSetSourceFile from jobs.job_standardize_format import JobStandardizeFormat from jobs.job_visualize_audio import JobVisualizeAudio -from jobs.job_chunk_files import JobChunkFiles +# from jobs.job_chunk_files import JobChunkFiles +from jobs.job_chunk_with_diarization import JobChunkWithDiarization +from jobs.job_pre_denoizing_fmmpeg import JobPreDenoizingFmmpeg +from jobs.job_pre_normalize_fmmpeg import JobPreNormalizingFmmpeg +from jobs.job_enhancement_rnnoise import JobEnhancementRnnoise +from jobs.job_diarization import JobDiarization +from jobs.job_transcribe_chk_openai import JobTranscribeChunkOpenAI class AppPipeline(PipelineBase): """アプリケーションのパイプライン""" @@ -13,7 +19,20 @@ class AppPipeline(PipelineBase): self.add_job(JobGetRequestId()) self.add_job(JobSetSourceFile()) self.add_job(JobStandardizeFormat()) - self.add_job(JobChunkFiles()) - self + self.add_job(JobVisualizeAudio()) + # self.add_job(JobChunkFiles()) + self.add_job(JobPreDenoizingFmmpeg()) # 前処理 定常ノイズ除去(弱) + self.add_job(JobPreNormalizingFmmpeg()) # 前処理 ラウドネス正規化 + # self.add_job(JobEnhancementRnnoise()) # 音声強調 RNNoise(弱) + # 必須: ピーク正規化(-1 dBFS) + # 推奨: ラウドネス正規化(-23 LUFS, TruePeak -2 dBFS) + # 任意: ピークリミッタ(会議参加者が大声を出す環境なら) + self.add_job(JobVisualizeAudio( + input_file=self.status.preprocess_inputfile, + dir_name="ready" + )) + self.add_job(JobDiarization()) # 話者分離 + self.add_job(JobChunkWithDiarization()) # チャンク分割(話者分離結果考慮) + self.add_job(JobTranscribeChunkOpenAI()) # チャンクごとにOpenAIで文字起こし diff --git a/src/pipeline/pipeline_base.py b/src/pipeline/pipeline_base.py index 72f9267..2670e9c 100644 --- a/src/pipeline/pipeline_base.py +++ b/src/pipeline/pipeline_base.py @@ -1,5 +1,6 @@ from typing import List from jobs.job_base import JobBase +from app_status import AppStatus from lib.custom_logger import get_logger logger = get_logger() @@ -8,6 +9,7 @@ class PipelineBase: def __init__(self): self.jobs:List[JobBase] = [] self.logger = get_logger() + self.status = AppStatus() def add_job(self, job: JobBase): self.jobs.append(job)