Speeding Up Long Audio Transcription Processing | Generated by AI

Home 2025.10

Why Is Processing So Long for 30-Min or 1-Hour Audio?

The Google Cloud Speech-to-Text Batch API (v2) is designed for asynchronous, long-running operations, especially with the "long" model, which handles extended audio files (up to ~1 hour per file). Here’s why it can feel “too long”:

If your audio is consistently 30+ minutes, the current setup isn’t ideal for quick turnaround—it’s better for offline/bulk processing.

How to Fix It: Reduce Processing Time

To handle long audio faster, the key is splitting the file into smaller chunks (e.g., 5-15 minutes each). This allows:

Step 1: Split the Audio File

Use FFmpeg (free, command-line tool) to split files without re-encoding (fast and lossless). Install FFmpeg if needed (e.g., brew install ffmpeg on macOS, apt install ffmpeg on Linux).

Add a function to your script to split the input file. Here’s an updated version of your script with splitting integrated:

import os
import argparse
import subprocess
import tempfile
from google.cloud import storage
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
import sys
import time  # For polling

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))

from scripts.llm.openrouter_client import call_openrouter_api  # noqa: F401

MAX_AUDIO_LENGTH_SECS = 20 * 60 * 60
OUTPUT_DIRECTORY = "assets/transcriptions"
CHUNK_DURATION_SECS = 600  # 10 minutes per chunk; adjust as needed (e.g., 900 for 15 min)


def split_audio_file(input_file, chunk_duration_secs=CHUNK_DURATION_SECS):
    """
    Split audio file into smaller chunks using FFmpeg.
    
    Args:
        input_file: Path to input audio.
        chunk_duration_secs: Duration of each chunk in seconds.
    
    Returns:
        List of chunk file paths.
    """
    filename = os.path.basename(input_file)
    name_without_ext = os.path.splitext(filename)[0]
    dir_name = os.path.dirname(input_file)
    
    # Create temp dir for chunks
    temp_dir = tempfile.mkdtemp()
    chunk_files = []
    
    # FFmpeg command (no re-encoding for speed)
    cmd = [
        "ffmpeg", "-i", input_file,
        "-f", "segment",  # Output format
        "-segment_time", str(chunk_duration_secs),
        "-c", "copy",  # Copy streams without re-encoding
        "-map", "0",  # Map all streams
        f"{temp_dir}/{name_without_ext}_chunk_%03d.{os.path.splitext(filename)[1][1:]}",  # Output pattern
        "-y"  # Overwrite
    ]
    
    try:
        subprocess.run(cmd, check=True, capture_output=True)
        # Find generated chunks
        for file in os.listdir(temp_dir):
            if file.startswith(f"{name_without_ext}_chunk_") and file.endswith(os.path.splitext(filename)[1]):
                chunk_files.append(os.path.join(temp_dir, file))
        chunk_files.sort()  # Sort by name (e.g., chunk_001, chunk_002)
        print(f"Split {filename} into {len(chunk_files)} chunks.")
        return chunk_files
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg error splitting {filename}: {e}")
        return []


def run_batch_recognize(audio_gcs_uri, output_gcs_folder, language_code="en-US"):
    """
    Transcribes an audio file using Google Cloud Speech-to-Text Batch API.
    Updated to use shorter model if audio is likely short (e.g., after splitting).
    """
    client = SpeechClient()

    filename = audio_gcs_uri.split('/')[-1]
    file_extension = filename.split('.')[-1].lower()

    # For chunks, use "short" or "default" model for speed (if <15 min)
    model = "short" if CHUNK_DURATION_SECS < 900 else "long"  # Adjust based on chunk size

    if file_extension == "ogg":
        decoding = cloud_speech.ExplicitDecodingConfig(
            encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.OGG_OPUS,
            sample_rate_hertz=48000,
            audio_channel_count=1
        )
        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=decoding,
            features=cloud_speech.RecognitionFeatures(
                enable_word_confidence=True,
                enable_word_time_offsets=True,
            ),
            model=model,
            language_codes=[language_code],
        )
    else:
        config = cloud_speech.RecognitionConfig(
            auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
            features=cloud_speech.RecognitionFeatures(
                enable_word_confidence=True,
                enable_word_time_offsets=True,
            ),
            model=model,
            language_codes=[language_code],
        )

    output_config = cloud_speech.RecognitionOutputConfig(
        gcs_output_config=cloud_speech.GcsOutputConfig(uri=output_gcs_folder),
    )

    files = [cloud_speech.BatchRecognizeFileMetadata(uri=audio_gcs_uri)]

    request = cloud_speech.BatchRecognizeRequest(
        recognizer="projects/graphite-ally-445108-k3/locations/global/recognizers/_",
        config=config,
        files=files,
        recognition_output_config=output_config,
    )

    print(f"Starting batch recognize for {filename}...")
    operation = client.batch_recognize(request=request)
    
    # Poll for progress (see below for details)
    poll_operation_with_progress(operation, filename)
    
    response = operation.result(timeout=3 * CHUNK_DURATION_SECS)  # Shorter timeout per chunk
    print(f"Completed transcription for {filename}. Response: {response}")
    return response


def poll_operation_with_progress(operation, filename):
    """
    Poll the long-running operation and show progress.
    """
    while not operation.done():
        # Get operation metadata (if available; Speech API provides basic status)
        try:
            metadata = operation.metadata
            print(f"Progress for {filename}: State={getattr(metadata, 'state', 'Unknown')}, "
                  f"Processed={getattr(metadata, 'progress_bytes', 'N/A')} bytes")
        except Exception:
            print(f"Waiting for {filename}... (checking every 30s)")
        
        time.sleep(30)  # Poll every 30 seconds
    if operation.exception():
        raise operation.exception()


def process_audio_file(input_file, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    filename = os.path.basename(input_file)
    if not (filename.endswith(".m4a") or filename.endswith(".ogg")):
        print(f"Error: {filename} is not a supported audio file (.m4a or .ogg).")
        return

    output_filename = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
    if os.path.exists(output_filename):
        print(f"Skipping {filename}: {output_filename} already exists.")
        return

    print(f"Processing: {filename}")

    # Determine language
    if filename.endswith("-zh.m4a") or filename.endswith("-zh.ogg"):
        language_code = "cmn-CN"
    else:
        language_code = "en-US"

    # Split into chunks if file is long (heuristic: >15 min, but you can probe duration with ffprobe)
    chunk_files = []
    if os.path.getsize(input_file) > 100 * 1024 * 1024:  # Rough check: >100MB likely long
        print(f"File is large; splitting into {CHUNK_DURATION_SECS//60}-minute chunks.")
        chunk_files = split_audio_file(input_file)
        if not chunk_files:
            print("Splitting failed; processing as single file.")
            chunk_files = [input_file]
    else:
        chunk_files = [input_file]

    storage_client = storage.Client()
    bucket = storage_client.bucket("test2x")

    all_transcripts = []  # To combine later

    for chunk_idx, chunk_file in enumerate(chunk_files):
        chunk_filename = os.path.basename(chunk_file)
        base_name = os.path.splitext(filename)[0]
        chunk_name = f"{base_name}_chunk_{chunk_idx+1:03d}"
        
        # Construct GCS paths
        gcs_audio_uri = f"gs://test2x/audio-files/{chunk_filename}"
        gcs_output_uri = f"gs://test2x/transcripts/{chunk_name}"

        # Upload chunk if needed
        blob = bucket.blob(f"audio-files/{chunk_filename}")
        if not blob.exists():
            blob.upload_from_filename(chunk_file)
            print(f"Uploaded chunk {chunk_filename} to GCS.")
        else:
            print(f"Chunk {chunk_filename} already in GCS.")

        # Transcribe
        try:
            run_batch_recognize(
                audio_gcs_uri=gcs_audio_uri,
                output_gcs_folder=gcs_output_uri,
                language_code=language_code,
            )

            # Download and collect transcript
            blobs = storage_client.list_blobs("test2x", prefix=f"transcripts/{chunk_name}")
            chunk_transcript = ""
            for b in blobs:
                if b.name.endswith(".json"):
                    local_path = os.path.join(output_dir, f"{os.path.basename(b.name)}")
                    b.download_to_filename(local_path)
                    # Parse JSON to extract text (simplified; use json module for full parsing)
                    import json
                    with open(local_path, 'r') as f:
                        data = json.load(f)
                        if 'results' in data:
                            chunk_text = ' '.join([result.get('alternatives', [{}])[0].get('transcript', '') for result in data['results']])
                            chunk_transcript += chunk_text + "\n"
                    print(f"Downloaded and parsed {b.name}")

            all_transcripts.append(chunk_transcript)

            # Clean up temp chunk if split
            if chunk_file != input_file and os.path.exists(chunk_file):
                os.remove(chunk_file)

        except Exception as e:
            print(f"Failed to process chunk {chunk_filename}: {e}")

    # Combine all transcripts into one file
    combined_text = "\n\n--- Chunk Boundary ---\n\n".join(all_transcripts)
    with open(output_filename, 'w') as f:
        f.write(combined_text)
    print(f"Combined transcript saved to {output_filename}")

    # Clean up temp dir if splitting occurred
    if chunk_files != [input_file]:
        import shutil
        shutil.rmtree(os.path.dirname(chunk_files[0]) if chunk_files else None)

    print(f"Processing complete for {filename}.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Process a single Voice Memo (.m4a/.ogg) file to generate transcription."
    )
    parser.add_argument(
        "--input_file",
        type=str,
        required=True,
        help="Input path for the Voice Memo file.",
    )
    parser.add_argument(
        "--output",
        type=str,
        default="assets/transcriptions",
        help="Output directory for transcriptions.",
    )
    parser.add_argument(
        "--chunk_secs",
        type=int,
        default=CHUNK_DURATION_SECS,
        help="Chunk duration in seconds (for splitting long files).",
    )

    args = parser.parse_args()
    global CHUNK_DURATION_SECS  # Allow override
    CHUNK_DURATION_SECS = args.chunk_secs

    process_audio_file(
        input_file=args.input_file,
        output_dir=args.output,
    )

Key Changes and Benefits:

Other Optimizations:

Showing Progress: Polling the Operation

The updated poll_operation_with_progress() function replaces the blocking operation.result(). It:

For more detailed progress (e.g., % complete), the Speech API doesn’t provide granular metrics, but you can:

If splitting, you’ll see interleaved progress from multiple chunks, making it feel less “stuck.”

If this doesn’t resolve your issue (e.g., specific errors), share logs/output for more tweaks!


Back

x-ai/grok-4-fast:free

Donate