
File name
Commit message
Commit date
import threading
import subprocess
import sys
import platform
import websocket
import json
import shlex
from tkinter import Tk, Button, Label, Text, Scrollbar, END
# Import the microphone selection helper
from get_microphone import get_microphone
# 1) Server configuration
SERVER_URL = "ws://takensofttesting.iptime.org:54127/v1/audio/transcriptions?language=ko"
# 2) Audio configuration
TARGET_RATE = 16000 # Resample to 16 kHz for the server
CHANNELS = 1 # Mono
FORMAT = 's16le' # 16-bit PCM little endian
# 3) FFmpeg configuration
def get_ffmpeg_command(device_info):
"""
Constructs the FFmpeg command based on the operating system and selected device.
:param device_info: Dictionary containing device information from get_microphone()
:return: List of FFmpeg command arguments
"""
os_name = platform.system()
if os_name == "Windows":
# For Windows, FFmpeg uses 'dshow' as the input device.
# device_info should contain the 'name' of the device as recognized by FFmpeg.
device_name = device_info.get("name", "default")
# Example device name: "Microphone (Realtek High Definition Audio)"
cmd = [
"ffmpeg",
"-f", "dshow",
"-i", f"audio={device_name}",
"-ar", str(TARGET_RATE),
"-ac", str(CHANNELS),
"-f", FORMAT,
"pipe:1"
]
elif os_name == "Darwin":
# For macOS, FFmpeg uses 'avfoundation'.
# device_info should contain the 'device_index' for audio.
device_index = device_info.get("device_index", "0")
# Example device index: "0" for default
cmd = [
"ffmpeg",
"-f", "avfoundation",
"-i", f":{device_index}",
"-ar", str(TARGET_RATE),
"-ac", str(CHANNELS),
"-f", FORMAT,
"pipe:1"
]
elif os_name == "Linux":
# For Linux, FFmpeg uses 'alsa'.
# device_info should contain the 'device_name' as recognized by FFmpeg.
device_name = device_info.get("name", "default")
# Example device name: "default" or "hw:1,0"
cmd = [
"ffmpeg",
"-f", "alsa",
"-i", device_name,
"-ar", str(TARGET_RATE),
"-ac", str(CHANNELS),
"-f", FORMAT,
"pipe:1"
]
else:
raise ValueError(f"Unsupported OS: {os_name}")
return cmd
class SpeechToTextClient:
"""
A client that:
- Uses FFmpeg to capture and process audio
- Initializes a WebSocket connection
- Streams raw 16-bit PCM over the WebSocket
- Displays transcriptions from the server in the GUI
"""
def __init__(self, gui):
"""
:param gui: An instance of the SpeechToTextGUI class for UI callbacks
"""
self.gui = gui
self.ws = None
self.ffmpeg_process = None
self.streaming_thread = None
self.running = False
# Ask user to pick a device
mic_info = get_microphone() # Should return a dict with necessary device info
self.device_info = mic_info
# Prepare the FFmpeg command
self.ffmpeg_cmd = get_ffmpeg_command(self.device_info)
def start_recording(self):
"""Starts FFmpeg, initializes the WebSocket connection, and begins streaming audio."""
if self.running:
print("Already recording.")
return
self.running = True
# 1) Start FFmpeg subprocess
try:
self.ffmpeg_process = subprocess.Popen(
self.ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, # Suppress FFmpeg stderr; remove if debugging
bufsize=10 ** 8
)
print("FFmpeg started.")
except Exception as e:
print(f"Failed to start FFmpeg: {e}")
self.running = False
return
# 2) Initialize the WebSocket connection
self.ws = websocket.WebSocketApp(
SERVER_URL,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# Run WebSocket in a background thread
ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
ws_thread.start()
print("WebSocket connection initiated.")
# 3) Start audio streaming loop in a separate thread
self.streaming_thread = threading.Thread(target=self.audio_stream, daemon=True)
self.streaming_thread.start()
self.gui.update_status("Recording started...")
def stop_recording(self):
"""Stops audio streaming, terminates FFmpeg, and closes the WebSocket."""
if not self.running:
print("Not currently recording.")
return
self.running = False
# 1) Terminate FFmpeg subprocess
if self.ffmpeg_process:
self.ffmpeg_process.terminate()
self.ffmpeg_process = None
print("FFmpeg terminated.")
# 2) Close WebSocket connection
if self.ws:
self.ws.close()
self.ws = None
print("WebSocket connection closed.")
self.gui.update_status("Recording stopped...")
def audio_stream(self):
"""
Continuously reads audio data from FFmpeg's stdout and sends it over WebSocket.
"""
try:
while self.running:
# Read a chunk of data
data = self.ffmpeg_process.stdout.read(4096) # Adjust chunk size as needed
if not data:
print("No more data from FFmpeg.")
break
# Send audio frames over WebSocket (binary)
if self.ws and self.ws.sock and self.ws.sock.connected:
try:
self.ws.send(data, opcode=websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error sending data over WebSocket: {e}")
break
else:
print("WebSocket is not connected.")
break
except Exception as e:
print(f"Error during audio streaming: {e}")
finally:
self.running = False
self.stop_recording()
# ---------------------
# WebSocket Callbacks
# ---------------------
def on_message(self, ws, message):
"""Handle transcriptions (or other messages) from the server."""
print("Received from server:", message)
try:
data = json.loads(message)
transcription = data.get("text", "")
if transcription:
self.gui.display_transcription(transcription)
except json.JSONDecodeError:
print("Error: Received invalid JSON:", message)
def on_error(self, ws, error):
"""Handle any WebSocket errors."""
print("WebSocket Error:", error)
def on_close(self, ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print("WebSocket Closed")
class SpeechToTextGUI:
"""
The GUI class for user interaction:
- Start/Stop buttons
- Status updates
- Displays transcriptions
- Ties everything together with SpeechToTextClient
"""
def __init__(self):
self.client = SpeechToTextClient(self)
# Main window setup
self.root = Tk()
self.root.title("Speech-to-Text Client")
# Status label
self.status_label = Label(self.root, text="Click 'Start Recording' to begin.", anchor="w")
self.status_label.pack(fill="x", padx=10, pady=5)
# Text area for transcriptions
self.text_display = Text(self.root, wrap="word", height=20)
self.text_display.pack(fill="both", expand=True, padx=10, pady=5)
# Scrollbar for transcription area
scrollbar = Scrollbar(self.text_display)
scrollbar.pack(side="right", fill="y")
self.text_display.config(yscrollcommand=scrollbar.set)
scrollbar.config(command=self.text_display.yview)
# Start/Stop Buttons
start_button = Button(
self.root,
text="Start Recording",
command=self.client.start_recording,
bg="green",
fg="white"
)
start_button.pack(side="left", padx=10, pady=10)
stop_button = Button(
self.root,
text="Stop Recording",
command=self.client.stop_recording,
bg="red",
fg="white"
)
stop_button.pack(side="right", padx=10, pady=10)
# Handle window close event to ensure subprocesses are terminated
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
def update_status(self, message):
"""Updates the status label."""
self.status_label.config(text=message)
def display_transcription(self, transcription):
"""Appends transcriptions to the text box and scrolls to the end."""
if transcription:
self.text_display.insert(END, transcription + "\n")
self.text_display.see(END) # Auto-scroll
def on_close(self):
"""Handle the window close event."""
self.client.stop_recording()
self.root.destroy()
def run(self):
"""Start the Tkinter event loop."""
self.root.mainloop()
if __name__ == "__main__":
gui = SpeechToTextGUI()
gui.run()