加速长音频转录处理 | AI生成和翻译

Home 2025.09

为什么30分钟或1小时的音频处理时间这么长?

Google Cloud Speech-to-Text 批量 API (v2) 专为异步长时间运行的操作而设计,特别是使用 "long" 模型处理长音频文件(每个文件最多约1小时)。以下是它可能感觉”太慢”的原因:

如果你的音频持续超过30分钟,当前设置不适合快速周转——它更适合离线/批量处理。

如何修复:减少处理时间

要更快处理长音频,关键是将文件分割成更小的块(例如,每块5-15分钟)。这样可以:

步骤1:分割音频文件

使用 FFmpeg(免费命令行工具)分割文件而无需重新编码(快速且无损)。如果需要请安装 FFmpeg(例如,macOS 上 brew install ffmpeg,Linux 上 apt install ffmpeg)。

在你的脚本中添加一个函数来分割输入文件。以下是集成了分割功能的脚本更新版本:

import os
import argparse
import subprocess
import tempfile
from google.cloud import storage
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
import sys
import time  # 用于轮询

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))

from scripts.llm.openrouter_client import call_openrouter_api  # noqa: F401

MAX_AUDIO_LENGTH_SECS = 20 * 60 * 60
OUTPUT_DIRECTORY = "assets/transcriptions"
CHUNK_DURATION_SECS = 600  # 每块10分钟;根据需要调整(例如,900为15分钟)


def split_audio_file(input_file, chunk_duration_secs=CHUNK_DURATION_SECS):
    """
    使用 FFmpeg 将音频文件分割成更小的块。
    
    参数:
        input_file: 输入音频路径。
        chunk_duration_secs: 每块的持续时间(秒)。
    
    返回:
        块文件路径列表。
    """
    filename = os.path.basename(input_file)
    name_without_ext = os.path.splitext(filename)[0]
    dir_name = os.path.dirname(input_file)
    
    # 为块创建临时目录
    temp_dir = tempfile.mkdtemp()
    chunk_files = []
    
    # FFmpeg 命令(无需重新编码以提高速度)
    cmd = [
        "ffmpeg", "-i", input_file,
        "-f", "segment",  # 输出格式
        "-segment_time", str(chunk_duration_secs),
        "-c", "copy",  # 无需重新编码直接复制流
        "-map", "0",  # 映射所有流
        f"{temp_dir}/{name_without_ext}_chunk_%03d.{os.path.splitext(filename)[1][1:]}",  # 输出模式
        "-y"  # 覆盖
    ]
    
    try:
        subprocess.run(cmd, check=True, capture_output=True)
        # 查找生成的块
        for file in os.listdir(temp_dir):
            if file.startswith(f"{name_without_ext}_chunk_") and file.endswith(os.path.splitext(filename)[1]):
                chunk_files.append(os.path.join(temp_dir, file))
        chunk_files.sort()  # 按名称排序(例如,chunk_001, chunk_002)
        print(f"已将 {filename} 分割为 {len(chunk_files)} 个块。")
        return chunk_files
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg 分割 {filename} 时出错:{e}")
        return []


def run_batch_recognize(audio_gcs_uri, output_gcs_folder, language_code="en-US"):
    """
    使用 Google Cloud Speech-to-Text 批量 API 转录音频文件。
    更新为在音频可能较短时(例如,分割后)使用更短的模型。
    """
    client = SpeechClient()

    filename = audio_gcs_uri.split('/')[-1]
    file_extension = filename.split('.')[-1].lower()

    # 对于块,使用 "short" 或 "default" 模型以提高速度(如果 <15 分钟)
    model = "short" if CHUNK_DURATION_SECS < 900 else "long"  # 根据块大小调整

    if file_extension == "ogg":
        decoding = cloud_speech.ExplicitDecodingConfig(
            encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.OGG_OPUS,
            sample_rate_hertz=48000,
            audio_channel_count=1
        )
        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=decoding,
            features=cloud_speech.RecognitionFeatures(
                enable_word_confidence=True,
                enable_word_time_offsets=True,
            ),
            model=model,
            language_codes=[language_code],
        )
    else:
        config = cloud_speech.RecognitionConfig(
            auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
            features=cloud_speech.RecognitionFeatures(
                enable_word_confidence=True,
                enable_word_time_offsets=True,
            ),
            model=model,
            language_codes=[language_code],
        )

    output_config = cloud_speech.RecognitionOutputConfig(
        gcs_output_config=cloud_speech.GcsOutputConfig(uri=output_gcs_folder),
    )

    files = [cloud_speech.BatchRecognizeFileMetadata(uri=audio_gcs_uri)]

    request = cloud_speech.BatchRecognizeRequest(
        recognizer="projects/graphite-ally-445108-k3/locations/global/recognizers/_",
        config=config,
        files=files,
        recognition_output_config=output_config,
    )

    print(f"开始为 {filename} 进行批量识别...")
    operation = client.batch_recognize(request=request)
    
    # 轮询进度(详见下文)
    poll_operation_with_progress(operation, filename)
    
    response = operation.result(timeout=3 * CHUNK_DURATION_SECS)  # 每块更短的超时时间
    print(f"完成 {filename} 的转录。响应:{response}")
    return response


def poll_operation_with_progress(operation, filename):
    """
    轮询长时间运行的操作并显示进度。
    """
    while not operation.done():
        # 获取操作元数据(如果可用;Speech API 提供基本状态)
        try:
            metadata = operation.metadata
            print(f"{filename} 的进度:状态={getattr(metadata, 'state', 'Unknown')}, "
                  f"已处理={getattr(metadata, 'progress_bytes', 'N/A')} 字节")
        except Exception:
            print(f"等待 {filename}...(每30秒检查一次)")
        
        time.sleep(30)  # 每30秒轮询一次
    if operation.exception():
        raise operation.exception()


def process_audio_file(input_file, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    filename = os.path.basename(input_file)
    if not (filename.endswith(".m4a") or filename.endswith(".ogg")):
        print(f"错误:{filename} 不是支持的音频文件(.m4a 或 .ogg)。")
        return

    output_filename = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
    if os.path.exists(output_filename):
        print(f"跳过 {filename}{output_filename} 已存在。")
        return

    print(f"处理中:{filename}")

    # 确定语言
    if filename.endswith("-zh.m4a") or filename.endswith("-zh.ogg"):
        language_code = "cmn-CN"
    else:
        language_code = "en-US"

    # 如果文件较长则分割(启发式:>15 分钟,但你可以使用 ffprobe 探测持续时间)
    chunk_files = []
    if os.path.getsize(input_file) > 100 * 1024 * 1024:  # 粗略检查:>100MB 可能较长
        print(f"文件较大;分割为 {CHUNK_DURATION_SECS//60} 分钟的块。")
        chunk_files = split_audio_file(input_file)
        if not chunk_files:
            print("分割失败;作为单个文件处理。")
            chunk_files = [input_file]
    else:
        chunk_files = [input_file]

    storage_client = storage.Client()
    bucket = storage_client.bucket("test2x")

    all_transcripts = []  # 用于后续合并

    for chunk_idx, chunk_file in enumerate(chunk_files):
        chunk_filename = os.path.basename(chunk_file)
        base_name = os.path.splitext(filename)[0]
        chunk_name = f"{base_name}_chunk_{chunk_idx+1:03d}"
        
        # 构建 GCS 路径
        gcs_audio_uri = f"gs://test2x/audio-files/{chunk_filename}"
        gcs_output_uri = f"gs://test2x/transcripts/{chunk_name}"

        # 如果需要则上传块
        blob = bucket.blob(f"audio-files/{chunk_filename}")
        if not blob.exists():
            blob.upload_from_filename(chunk_file)
            print(f"已将块 {chunk_filename} 上传到 GCS。")
        else:
            print(f"块 {chunk_filename} 已在 GCS 中。")

        # 转录
        try:
            run_batch_recognize(
                audio_gcs_uri=gcs_audio_uri,
                output_gcs_folder=gcs_output_uri,
                language_code=language_code,
            )

            # 下载并收集转录稿
            blobs = storage_client.list_blobs("test2x", prefix=f"transcripts/{chunk_name}")
            chunk_transcript = ""
            for b in blobs:
                if b.name.endswith(".json"):
                    local_path = os.path.join(output_dir, f"{os.path.basename(b.name)}")
                    b.download_to_filename(local_path)
                    # 解析 JSON 提取文本(简化版;使用 json 模块进行完整解析)
                    import json
                    with open(local_path, 'r') as f:
                        data = json.load(f)
                        if 'results' in data:
                            chunk_text = ' '.join([result.get('alternatives', [{}])[0].get('transcript', '') for result in data['results']])
                            chunk_transcript += chunk_text + "\n"
                    print(f"已下载并解析 {b.name}")

            all_transcripts.append(chunk_transcript)

            # 如果分割了则清理临时块
            if chunk_file != input_file and os.path.exists(chunk_file):
                os.remove(chunk_file)

        except Exception as e:
            print(f"处理块 {chunk_filename} 失败:{e}")

    # 将所有转录稿合并到一个文件中
    combined_text = "\n\n--- 块边界 ---\n\n".join(all_transcripts)
    with open(output_filename, 'w') as f:
        f.write(combined_text)
    print(f"合并的转录稿已保存到 {output_filename}")

    # 如果进行了分割则清理临时目录
    if chunk_files != [input_file]:
        import shutil
        shutil.rmtree(os.path.dirname(chunk_files[0]) if chunk_files else None)

    print(f"完成 {filename} 的处理。")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="处理单个语音备忘录 (.m4a/.ogg) 文件以生成转录稿。"
    )
    parser.add_argument(
        "--input_file",
        type=str,
        required=True,
        help="语音备忘录文件的输入路径。",
    )
    parser.add_argument(
        "--output",
        type=str,
        default="assets/transcriptions",
        help="转录稿的输出目录。",
    )
    parser.add_argument(
        "--chunk_secs",
        type=int,
        default=CHUNK_DURATION_SECS,
        help="块持续时间(秒)(用于分割长文件)。",
    )

    args = parser.parse_args()
    global CHUNK_DURATION_SECS  # 允许覆盖
    CHUNK_DURATION_SECS = args.chunk_secs

    process_audio_file(
        input_file=args.input_file,
        output_dir=args.output,
    )

关键更改和好处:

其他优化:

显示进度:轮询操作

更新的 poll_operation_with_progress() 函数替换了阻塞的 operation.result()。它:

对于更详细的进度(例如,完成百分比),Speech API 不提供细粒度指标,但你可以:

如果进行分割,你会看到来自多个块的交错进度,使其感觉不那么”卡住”。

如果这不能解决你的问题(例如,特定错误),请分享日志/输出以进行更多调整!


Back

x-ai/grok-4-fast:free

Donate