Reconnaissance vocale en temps réel

Home PDF Audio

Ce code Python implémente une reconnaissance vocale en temps réel utilisant l’API Google Cloud Speech-to-Text et la bibliothèque PyAudio. Il capture l’audio depuis le microphone, le transmet en continu à l’API Speech-to-Text et affiche le texte transcrit. La classe MicrophoneStream gère l’entrée audio, et la fonction main configure le client de reconnaissance vocale et traite le flux audio.

import os
import argparse
import io
import sys
import time

from google.cloud import speech

import pyaudio
from six.moves import queue


# Paramètres d'enregistrement audio
RATE = 16000
CHUNK = int(RATE / 10)  # 100ms


class MicrophoneStream(object):
    """Ouvre un flux d'enregistrement en tant que générateur produisant les blocs audio."""
    def __init__(self, rate, chunk):
        self._rate = rate
        self._chunk = chunk

        # Créer une interface audio utilisant PyAudio
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            # L'API ne supporte actuellement que l'audio 1 canal (mono)
            # https://goo.gl/z726ff
            channels=1, rate=self._rate,
            input=True, frames_per_buffer=self._chunk,
            # Exécuter le flux audio de manière asynchrone pour remplir l'objet buffer.
            # Ceci est nécessaire pour que le buffer du périphérique d'entrée ne
            # déborde pas pendant que le thread appelant effectue des requêtes réseau, etc.
            stream_callback=self._fill_buffer,
        )
        self.closed = False
        self._buff = queue.Queue()

    def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
        """Collecte continuellement les données du flux audio, dans le buffer."""
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self, record_seconds):
        start_time = time.time()
        while not self.closed and time.time() - start_time < record_seconds:
            # Utiliser un get() bloquant pour s'assurer qu'il y a au moins un bloc de
            # données, et arrêter l'itération si le bloc est None, indiquant la
            # fin du flux audio.
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Maintenant, consommer toutes les autres données encore en buffer.
            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b''.join(data)

    def close(self):
        self.closed = True
        # Signaler au générateur de terminer afin que la méthode
        # streaming recognize du client ne bloque pas la terminaison du processus.
        self._buff.put(None)
        self._audio_stream.close()
        self._audio_interface.terminate()

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()


def main(record_seconds=10, language_code='fr-FR'):
    # Voir http://g.co/cloud/speech/docs/languages
    # pour une liste des langues prises en charge.
    # language_code = 'fr-FR'  # une balise de langue BCP-47

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code,
        model="latest_long",
    )

    streaming_config = speech.StreamingRecognitionConfig(
        config=config,
        interim_results=True)

    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator(record_seconds)
        requests = (speech.StreamingRecognizeRequest(audio_content=content)
                    for content in audio_generator)

        responses = client.streaming_recognize(streaming_config, requests)

        # Maintenant, utiliser les réponses de transcription.
        transcript = ""
        for response in responses:
            print(response)
            # Une fois la transcription terminée, afficher le résultat.
            for result in response.results:
                if result.is_final:
                    alternative = result.alternatives[0]
                    transcript += alternative.transcript + " "
        print(u'Transcription : {}'.format(transcript))


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Reconnaissance vocale en temps réel avec durée ajustable.")
    parser.add_argument('--duration', type=int, default=10, help="Durée de l'enregistrement en secondes.")
    parser.add_argument('--language_code', type=str, default='fr-FR', help="Code de langue pour la transcription.")
    args = parser.parse_args()
    print("Veuillez parler...")
    main(record_seconds=args.duration, language_code=args.language_code)


Back 2025.02.22 Donate