47 lines
1.9 KiB
Python
47 lines
1.9 KiB
Python
import os
|
||
from jobs.job_base import JobBase
|
||
|
||
class JobPreDenoizingFmmpeg(JobBase):
|
||
|
||
"""音声ファイルの前処理(ノイズ除去)を行うジョブ"""
|
||
def __init__(self):
|
||
super().__init__(name=self.__class__.__name__)
|
||
self.description = "Pre Denoizing Fmmpeg Job"
|
||
|
||
def _denoise_ffmpeg(self, src, dst):
|
||
import subprocess, pathlib
|
||
pathlib.Path(dst).parent.mkdir(parents=True, exist_ok=True)
|
||
cmd = [
|
||
"ffmpeg", "-y", "-i", src,
|
||
"-af", (
|
||
"highpass=f=60," # 低域カット
|
||
"lowpass=f=9000," # 高域ノイズ軽減
|
||
"afftdn=nr=12:nf=-25," # 軽いノイズ除去(FFTベース)
|
||
"dynaudnorm=p=0.5" # 軽い音量均し(強めたい場合は:m=15:s=3)
|
||
),
|
||
"-ar", "16000", "-ac", "1", # STT用に16kHz/モノラル変換
|
||
dst
|
||
]
|
||
subprocess.run(cmd, check=True)
|
||
|
||
|
||
def execute(self):
|
||
self.logger.info(f"{self.name} execute started")
|
||
|
||
if not os.path.exists(self.status.preprocess_dir):
|
||
self.status.preprocess_index = 0
|
||
elif os.path.exists(self.status.preprocess_outputfile):
|
||
# preprocess_dirが存在する場合かつ、 preprocess_outputfileが存在する場合
|
||
self.logger.info(f"Preprocess output file already exists: {self.status.preprocess_outputfile}")
|
||
self.status.preprocess_inputfile = self.status.preprocess_outputfile
|
||
self.status.preprocess_index += 1
|
||
return
|
||
|
||
input_file = self.status.preprocess_inputfile
|
||
output_file = self.status.preprocess_outputfile
|
||
self._denoise_ffmpeg(input_file, output_file)
|
||
|
||
self.status.preprocess_inputfile = output_file
|
||
self.status.preprocess_index += 1
|
||
return
|