Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline | |
| import time | |
| import os | |
| import numpy as np | |
| import soundfile as sf | |
| import librosa | |
| # --- Configuration --- | |
| # Device selection (GPU if available, else CPU) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| print(f"Using device: {device}") | |
| # STT Model (Use smaller model for lower latency) | |
| stt_model_id = "openai/whisper-tiny" # Or "openai/whisper-base". Avoid larger models for streaming. | |
| # Summarization Model | |
| summarizer_model_id = "sshleifer/distilbart-cnn-6-6" # Use a distilled/smaller model for speed | |
| # Summarization Interval (seconds) - How often to regenerate the summary | |
| SUMMARY_INTERVAL = 30.0 # Summarize every 30 seconds | |
| # --- Load Models --- | |
| # (Keep the model loading code exactly the same as before) | |
| print("Loading STT model...") | |
| stt_model = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| stt_model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
| ) | |
| stt_model.to(device) | |
| processor = AutoProcessor.from_pretrained(stt_model_id) | |
| stt_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model=stt_model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| max_new_tokens=128, | |
| chunk_length_s=30, | |
| batch_size=16, | |
| torch_dtype=torch_dtype, | |
| device=device, | |
| ) | |
| print("STT model loaded.") | |
| print("Loading Summarization pipeline...") | |
| summarizer = pipeline( | |
| "summarization", | |
| model=summarizer_model_id, | |
| device=device | |
| ) | |
| print("Summarization pipeline loaded.") | |
| # --- Helper Functions --- | |
| # (Keep the format_summary_as_bullets function exactly the same) | |
| def format_summary_as_bullets(summary_text): | |
| """Attempts to format a summary text block into bullet points.""" | |
| if not summary_text: | |
| return "" | |
| # Simple approach: split by sentences and add bullets. | |
| # More advanced NLP could be used here. | |
| sentences = summary_text.replace(". ", ".\n- ").split('\n') | |
| bullet_summary = "- " + "\n".join(sentences).strip() | |
| # Remove potential empty bullets | |
| bullet_summary = "\n".join([line for line in bullet_summary.split('\n') if line.strip() not in ['-', '']]) | |
| return bullet_summary | |
| # --- Processing Function for Streaming --- | |
| # (Keep the process_audio_stream function exactly the same) | |
| # This function ONLY processes audio, it doesn't interact with the webcam video | |
| def process_audio_stream( | |
| new_chunk_tuple, # Gradio streaming yields (sample_rate, numpy_data) | |
| accumulated_transcript_state, # gr.State holding the full text | |
| last_summary_time_state, # gr.State holding the timestamp of the last summary | |
| current_summary_state # gr.State holding the last generated summary | |
| ): | |
| if new_chunk_tuple is None: | |
| # Initial call or stream ended, return current state | |
| return accumulated_transcript_state, current_summary_state, accumulated_transcript_state, last_summary_time_state, current_summary_state | |
| sample_rate, audio_chunk = new_chunk_tuple | |
| if audio_chunk is None or sample_rate is None or audio_chunk.size == 0: | |
| # Handle potential empty chunks gracefully | |
| return accumulated_transcript_state, current_summary_state, accumulated_transcript_state, last_summary_time_state, current_summary_state | |
| print(f"Received chunk: {audio_chunk.shape}, Sample Rate: {sample_rate}, Duration: {len(audio_chunk)/sample_rate:.2f}s") | |
| # Ensure audio is float32 and mono, as Whisper expects | |
| if audio_chunk.dtype != np.float32: | |
| # Normalize assuming input is int16 | |
| # Adjust if your microphone provides different integer types | |
| audio_chunk = audio_chunk.astype(np.float32) / 32768.0 # Max value for int16 is 32767 | |
| # --- 1. Transcribe the new chunk --- | |
| new_text = "" | |
| try: | |
| result = stt_pipeline({"sampling_rate": sample_rate, "raw": audio_chunk.copy()}) | |
| new_text = result["text"].strip() if result["text"] else "" | |
| print(f"Transcription chunk: '{new_text}'") | |
| except Exception as e: | |
| print(f"Error during transcription chunk: {e}") | |
| new_text = f"[Transcription Error: {e}]" | |
| # --- 2. Update Accumulated Transcript --- | |
| if accumulated_transcript_state and not accumulated_transcript_state.endswith((" ", "\n")) and new_text: | |
| updated_transcript = accumulated_transcript_state + " " + new_text | |
| else: | |
| updated_transcript = accumulated_transcript_state + new_text | |
| # --- 3. Periodic Summarization --- | |
| current_time = time.time() | |
| new_summary = current_summary_state # Keep the old summary by default | |
| updated_last_summary_time = last_summary_time_state | |
| # Check transcript length to avoid summarizing tiny bits of text too early | |
| if updated_transcript and len(updated_transcript) > 50 and (current_time - last_summary_time_state > SUMMARY_INTERVAL): | |
| print(f"Summarizing transcript (length: {len(updated_transcript)})...") | |
| try: | |
| # Summarize the *entire* transcript up to this point | |
| summary_result = summarizer(updated_transcript, max_length=150, min_length=30, do_sample=False) | |
| if summary_result and isinstance(summary_result, list): | |
| raw_summary = summary_result[0]['summary_text'] | |
| new_summary = format_summary_as_bullets(raw_summary) | |
| updated_last_summary_time = current_time # Update time only on successful summary | |
| print("Summary updated.") | |
| else: | |
| print("Summarization did not produce expected output.") | |
| except Exception as e: | |
| print(f"Error during summarization: {e}") | |
| # Display error in summary box but keep the last known good summary in state | |
| # To avoid overwriting a potentially useful summary with just an error message | |
| # We return the error message for display, but not update summary_state with it | |
| error_display_summary = f"[Summarization Error]\n\nLast good summary:\n{current_summary_state}" | |
| return updated_transcript, error_display_summary, updated_transcript, last_summary_time_state, current_summary_state | |
| # --- 4. Return Updated State and Outputs --- | |
| return updated_transcript, new_summary, updated_transcript, updated_last_summary_time, new_summary | |
| # --- Gradio Interface --- | |
| print("Creating Gradio interface...") | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Real-Time Meeting Notes with Webcam View") | |
| gr.Markdown("Speak into your microphone. Transcription appears below. Summary updates periodically.") | |
| # State variables to store data between stream calls | |
| transcript_state = gr.State("") # Holds the full transcript | |
| last_summary_time = gr.State(0.0) # Holds the time the summary was last generated | |
| summary_state = gr.State("") # Holds the current bullet point summary | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Input: Microphone stream | |
| audio_stream = gr.Audio(sources=["microphone"], streaming=True, label="Live Microphone Input", type="numpy") | |
| # NEW: Webcam Display | |
| # Use gr.Image which is simpler for just displaying webcam feed | |
| # live=True makes it update continuously | |
| webcam_view = gr.Image(sources=["webcam"], label="Your Webcam", streaming=True) # Use streaming=True for live view | |
| with gr.Column(scale=2): | |
| transcription_output = gr.Textbox(label="Full Transcription", lines=15, interactive=False) # Display only | |
| summary_output = gr.Textbox(label=f"Bullet Point Summary (Updates ~every {SUMMARY_INTERVAL}s)", lines=10, interactive=False) # Display only | |
| # Connect the streaming audio input to the processing function | |
| # Note: The webcam component runs independently in the browser, it doesn't feed data here | |
| audio_stream.stream( | |
| fn=process_audio_stream, | |
| inputs=[audio_stream, transcript_state, last_summary_time, summary_state], | |
| outputs=[transcription_output, summary_output, transcript_state, last_summary_time, summary_state], | |
| ) | |
| # Add a button to clear the state if needed | |
| def clear_state_values(): | |
| print("Clearing state.") | |
| return "", "", 0.0, "" # Clear transcript display, summary display, reset time state, clear summary state | |
| # Need separate function to clear states vs displays if they differ | |
| def clear_state(): | |
| return "", 0.0, "" # Clear transcript_state, last_summary_time, summary_state | |
| clear_button = gr.Button("Clear Transcript & Summary") | |
| # This button clears the display textboxes AND resets the internal states | |
| clear_button.click( | |
| fn=lambda: ("", "", "", 0.0, ""), # Return empty values for all outputs/states | |
| inputs=[], | |
| outputs=[transcription_output, summary_output, transcript_state, last_summary_time, summary_state] | |
| ) | |
| print("Launching Gradio interface...") | |
| demo.queue() # Enable queue for handling multiple requests/stream chunks | |
| demo.launch(debug=True, share=True) # share=True for Colab public link |