Spaces:
Running
Running
| import spaces | |
| import tempfile | |
| import gradio as gr | |
| import subprocess | |
| import os, stat | |
| import uuid | |
| from googletrans import Translator | |
| import edge_tts | |
| import asyncio | |
| import ffmpeg | |
| import json | |
| from scipy.signal import wiener | |
| import soundfile as sf | |
| from pydub import AudioSegment | |
| import numpy as np | |
| import librosa | |
| from zipfile import ZipFile | |
| import shlex | |
| import cv2 | |
| import torch | |
| import torchvision | |
| from tqdm import tqdm | |
| from numba import jit | |
| from huggingface_hub import HfApi | |
| import moviepy.editor as mp | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| api = HfApi(token=HF_TOKEN) | |
| repo_id = "artificialguybr/video-dubbing" | |
| ZipFile("ffmpeg.zip").extractall() | |
| st = os.stat('ffmpeg') | |
| os.chmod('ffmpeg', st.st_mode | stat.S_IEXEC) | |
| print("Starting the program...") | |
| def generate_unique_filename(extension): | |
| return f"{uuid.uuid4()}{extension}" | |
| def cleanup_files(*files): | |
| for file in files: | |
| if file and os.path.exists(file): | |
| os.remove(file) | |
| print(f"Removed file: {file}") | |
| def check_for_faces(video_path): | |
| face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| cap = cv2.VideoCapture(video_path) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = face_cascade.detectMultiScale(gray, 1.1, 4) | |
| if len(faces) > 0: | |
| return True | |
| return False | |
| def transcribe_audio(file_path): | |
| print(f"Starting transcription of file: {file_path}") | |
| temp_audio = None | |
| if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')): | |
| print("Video file detected. Extracting audio...") | |
| try: | |
| video = mp.VideoFileClip(file_path) | |
| temp_audio = generate_unique_filename(".wav") | |
| video.audio.write_audiofile(temp_audio) | |
| file_path = temp_audio | |
| except Exception as e: | |
| print(f"Error extracting audio from video: {e}") | |
| raise | |
| print(f"Does the file exist? {os.path.exists(file_path)}") | |
| print(f"File size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'} bytes") | |
| output_file = generate_unique_filename(".json") | |
| command = [ | |
| "insanely-fast-whisper", | |
| "--file-name", file_path, | |
| "--device-id", "0", | |
| "--model-name", "openai/whisper-large-v3", | |
| "--task", "transcribe", | |
| "--timestamp", "chunk", | |
| "--transcript-path", output_file | |
| ] | |
| print(f"Executing command: {' '.join(command)}") | |
| try: | |
| result = subprocess.run(command, check=True, capture_output=True, text=True) | |
| print(f"Standard output: {result.stdout}") | |
| print(f"Error output: {result.stderr}") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error running insanely-fast-whisper: {e}") | |
| print(f"Standard output: {e.stdout}") | |
| print(f"Error output: {e.stderr}") | |
| raise | |
| print(f"Reading transcription file: {output_file}") | |
| try: | |
| with open(output_file, "r") as f: | |
| transcription = json.load(f) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON: {e}") | |
| print(f"File content: {open(output_file, 'r').read()}") | |
| raise | |
| if "text" in transcription: | |
| result = transcription["text"] | |
| else: | |
| result = " ".join([chunk["text"] for chunk in transcription.get("chunks", [])]) | |
| print("Transcription completed.") | |
| # Cleanup | |
| cleanup_files(output_file) | |
| if temp_audio: | |
| cleanup_files(temp_audio) | |
| return result | |
| async def text_to_speech(text, voice, output_file): | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(output_file) | |
| def process_video(radio, video, target_language, has_closeup_face): | |
| try: | |
| if target_language is None: | |
| raise ValueError("Please select a Target Language for Dubbing.") | |
| run_uuid = uuid.uuid4().hex[:6] | |
| output_filename = f"{run_uuid}_resized_video.mp4" | |
| ffmpeg.input(video).output(output_filename, vf='scale=-2:720').run() | |
| video_path = output_filename | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Error: {video_path} does not exist.") | |
| video_info = ffmpeg.probe(video_path) | |
| video_duration = float(video_info['streams'][0]['duration']) | |
| if video_duration > 60: | |
| os.remove(video_path) | |
| raise ValueError("Video duration exceeds 1 minute. Please upload a shorter video.") | |
| ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run() | |
| shell_command = f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav".split(" ") | |
| subprocess.run([item for item in shell_command], capture_output=False, text=True, check=True) | |
| print("Attempting to transcribe with Whisper...") | |
| try: | |
| whisper_text = transcribe_audio(f"{run_uuid}_output_audio_final.wav") | |
| print(f"Transcription successful: {whisper_text}") | |
| except Exception as e: | |
| print(f"Error encountered during transcription: {str(e)}") | |
| raise | |
| language_mapping = { | |
| 'English': ('en', 'en-US-EricNeural'), | |
| 'Spanish': ('es', 'es-ES-AlvaroNeural'), | |
| 'French': ('fr', 'fr-FR-HenriNeural'), | |
| 'German': ('de', 'de-DE-ConradNeural'), | |
| 'Italian': ('it', 'it-IT-DiegoNeural'), | |
| 'Portuguese': ('pt', 'pt-PT-DuarteNeural'), | |
| 'Polish': ('pl', 'pl-PL-MarekNeural'), | |
| 'Turkish': ('tr', 'tr-TR-AhmetNeural'), | |
| 'Russian': ('ru', 'ru-RU-DmitryNeural'), | |
| 'Dutch': ('nl', 'nl-NL-MaartenNeural'), | |
| 'Czech': ('cs', 'cs-CZ-AntoninNeural'), | |
| 'Arabic': ('ar', 'ar-SA-HamedNeural'), | |
| 'Chinese (Simplified)': ('zh-CN', 'zh-CN-YunxiNeural'), | |
| 'Japanese': ('ja', 'ja-JP-KeitaNeural'), | |
| 'Korean': ('ko', 'ko-KR-InJoonNeural'), | |
| 'Hindi': ('hi', 'hi-IN-MadhurNeural'), | |
| 'Swedish': ('sv', 'sv-SE-MattiasNeural'), | |
| 'Danish': ('da', 'da-DK-JeppeNeural'), | |
| 'Finnish': ('fi', 'fi-FI-HarriNeural'), | |
| 'Greek': ('el', 'el-GR-NestorasNeural') | |
| } | |
| target_language_code, voice = language_mapping[target_language] | |
| translator = Translator() | |
| translated_text = translator.translate(whisper_text, dest=target_language_code).text | |
| print(translated_text) | |
| asyncio.run(text_to_speech(translated_text, voice, f"{run_uuid}_output_synth.wav")) | |
| pad_top = 0 | |
| pad_bottom = 15 | |
| pad_left = 0 | |
| pad_right = 0 | |
| rescaleFactor = 1 | |
| video_path_fix = video_path | |
| if has_closeup_face: | |
| has_face = True | |
| else: | |
| has_face = check_for_faces(video_path) | |
| if has_closeup_face: | |
| try: | |
| cmd = f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face {shlex.quote(video_path)} --audio '{run_uuid}_output_synth.wav' --pads {pad_top} {pad_bottom} {pad_left} {pad_right} --resize_factor {rescaleFactor} --nosmooth --outfile '{run_uuid}_output_video.mp4'" | |
| subprocess.run(cmd, shell=True, check=True) | |
| except subprocess.CalledProcessError as e: | |
| if "Face not detected! Ensure the video contains a face in all the frames." in str(e.stderr): | |
| gr.Warning("Wav2lip didn't detect a face. Please try again with the option disabled.") | |
| cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4" | |
| subprocess.run(cmd, shell=True) | |
| else: | |
| cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4" | |
| subprocess.run(cmd, shell=True) | |
| if not os.path.exists(f"{run_uuid}_output_video.mp4"): | |
| raise FileNotFoundError(f"Error: {run_uuid}_output_video.mp4 was not generated.") | |
| output_video_path = f"{run_uuid}_output_video.mp4" | |
| files_to_delete = [ | |
| f"{run_uuid}_resized_video.mp4", | |
| f"{run_uuid}_output_audio.wav", | |
| f"{run_uuid}_output_audio_final.wav", | |
| f"{run_uuid}_output_synth.wav" | |
| ] | |
| for file in files_to_delete: | |
| try: | |
| os.remove(file) | |
| except FileNotFoundError: | |
| print(f"File {file} not found for deletion.") | |
| return output_video_path | |
| except Exception as e: | |
| print(f"Error in process_video: {str(e)}") | |
| return gr.update(value=None, visible=True), f"Error: {str(e)}" | |
| def swap(radio): | |
| if(radio == "Upload"): | |
| return gr.update(source="upload") | |
| else: | |
| return gr.update(source="webcam") | |
| video = gr.Video() | |
| radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False) | |
| iface = gr.Interface( | |
| fn=process_video, | |
| inputs=[ | |
| radio, | |
| video, | |
| gr.Dropdown(choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", "Polish", "Turkish", "Russian", "Dutch", "Czech", "Arabic", "Chinese (Simplified)", "Japanese", "Korean", "Hindi", "Swedish", "Danish", "Finnish", "Greek"], label="Target Language for Dubbing", value="Spanish"), | |
| gr.Checkbox( | |
| label="Video has a close-up face. Use Wav2lip.", | |
| value=False, | |
| info="Say if video have close-up face. For Wav2lip. Will not work if checked wrongly.") | |
| ], | |
| outputs=[gr.Video(), gr.Textbox(label="Error Message")], | |
| live=False, | |
| title="AI Video Dubbing", | |
| description="""This tool was developed by [@artificialguybr](https://twitter.com/artificialguybr) using entirely open-source tools. Special thanks to Hugging Face for the GPU support. Thanks [@yeswondwer](https://twitter.com/@yeswondwerr) for original code. Test the [Video Transcription and Translate](https://huggingface.co/spaces/artificialguybr/VIDEO-TRANSLATION-TRANSCRIPTION) space!""", | |
| allow_flagging=False | |
| ) | |
| with gr.Blocks() as demo: | |
| iface.render() | |
| radio.change(swap, inputs=[radio], outputs=video) | |
| gr.Markdown(""" | |
| **Note:** | |
| - Video limit is 1 minute. It will dubbing all people using just one voice. | |
| - Generation may take up to 5 minutes. | |
| - The tool uses open-source models for all models. It's an alpha version. | |
| - Quality can be improved but would require more processing time per video. For scalability and hardware limitations, speed was chosen, not just quality. | |
| - If you need more than 1 minute, duplicate the Space and change the limit on app.py. | |
| - If you incorrectly mark the 'Video has a close-up face' checkbox, the dubbing may not work as expected. | |
| """) | |
| print("Launching Gradio interface...") | |
| demo.queue() | |
| demo.launch() | |