Deep-Live-Cam/modules/utilities.py

228 lines
6.6 KiB
Python
Raw Normal View History

2023-09-24 21:36:57 +08:00
import glob
import mimetypes
import os
import platform
import ssl
import subprocess
import cv2
import modules
2023-09-24 21:36:57 +08:00
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
import modules.globals
2024-12-13 22:19:11 +08:00
TEMP_FILE = "temp.mp4"
TEMP_DIRECTORY = "temp"
2023-09-24 21:36:57 +08:00
# monkey patch ssl for mac
2024-12-13 22:19:11 +08:00
if platform.system().lower() == "darwin":
2023-09-24 21:36:57 +08:00
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
"""Run an ffmpeg command with the given arguments."""
2024-12-13 22:19:11 +08:00
commands = [
"ffmpeg",
"-hide_banner",
"-hwaccel",
"auto",
"-loglevel",
modules.globals.log_level,
]
2023-09-24 21:36:57 +08:00
commands.extend(args)
try:
subprocess.run(commands, check=True)
2023-09-24 21:36:57 +08:00
return True
except Exception as e:
print(f"Error running ffmpeg: {e}")
return False
2023-09-24 21:36:57 +08:00
def detect_fps(target_path: str) -> float:
"""Detect the FPS of a video file using ffprobe."""
2024-12-13 22:19:11 +08:00
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=r_frame_rate",
"-of",
"default=noprint_wrappers=1:nokey=1",
target_path,
]
2023-09-24 21:36:57 +08:00
try:
output = subprocess.check_output(command).decode().strip().split("/")
if len(output) == 2:
return float(output[0]) / float(output[1])
return float(output[0])
except Exception as e:
print(f"Error detecting FPS: {e}")
return 30.0
2023-09-24 21:36:57 +08:00
def extract_frames(target_path: str) -> None:
"""Extract frames from a video file to a temp directory."""
2023-09-24 21:36:57 +08:00
temp_directory_path = get_temp_directory_path(target_path)
2024-12-13 22:19:11 +08:00
run_ffmpeg(
[
"-i",
target_path,
"-pix_fmt",
"rgb24",
os.path.join(temp_directory_path, "%04d.png"),
]
)
2023-09-24 21:36:57 +08:00
def create_video(target_path: str, fps: float = 30.0) -> None:
"""Create a video from frames in the temp directory."""
2023-09-24 21:36:57 +08:00
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
2024-12-13 22:19:11 +08:00
run_ffmpeg(
[
"-r",
str(fps),
"-i",
os.path.join(temp_directory_path, "%04d.png"),
"-c:v",
modules.globals.video_encoder,
"-crf",
str(modules.globals.video_quality),
"-pix_fmt",
"yuv420p",
"-vf",
"colorspace=bt709:iall=bt601-6-625:fast=1",
"-y",
temp_output_path,
]
)
2023-09-24 21:36:57 +08:00
def restore_audio(target_path: str, output_path: str) -> None:
"""Restore audio from the original video to the output video."""
2023-09-24 21:36:57 +08:00
temp_output_path = get_temp_output_path(target_path)
2024-12-13 22:19:11 +08:00
done = run_ffmpeg(
[
"-i",
temp_output_path,
"-i",
target_path,
"-c:v",
"copy",
"-map",
"0:v:0",
"-map",
"1:a:0",
"-y",
output_path,
]
)
2023-09-24 21:36:57 +08:00
if not done:
print(f"Failed to restore audio for {output_path}")
2023-09-24 21:36:57 +08:00
def get_temp_frame_paths(target_path: str) -> List[str]:
"""Get all temp frame file paths for a given target path."""
2023-09-24 21:36:57 +08:00
temp_directory_path = get_temp_directory_path(target_path)
try:
return sorted([
str(p) for p in Path(temp_directory_path).glob("*.png")
])
except Exception as e:
print(f"Error getting temp frame paths: {e}")
return []
2023-09-24 21:36:57 +08:00
def get_temp_directory_path(target_path: str) -> str:
"""Get the temp directory path for a given target path."""
base = os.path.splitext(os.path.basename(target_path))[0]
temp_dir = os.path.join(TEMP_DIRECTORY, base)
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
2023-09-24 21:36:57 +08:00
def get_temp_output_path(target_path: str) -> str:
"""Get the temp output video path for a given target path."""
base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4")
2023-09-24 21:36:57 +08:00
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
"""Normalize the output path for saving results."""
if not output_path:
base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(TEMP_DIRECTORY, f"{base}_result.png")
2023-09-24 21:36:57 +08:00
return output_path
def create_temp(target_path: str) -> None:
"""Create a temp directory for a given target path."""
2023-09-24 21:36:57 +08:00
temp_directory_path = get_temp_directory_path(target_path)
os.makedirs(temp_directory_path, exist_ok=True)
2023-09-24 21:36:57 +08:00
def move_temp(target_path: str, output_path: str) -> None:
"""Move temp output to the final output path."""
2023-09-24 21:36:57 +08:00
temp_output_path = get_temp_output_path(target_path)
try:
os.rename(temp_output_path, output_path)
except Exception as e:
print(f"Error moving temp output: {e}")
2023-09-24 21:36:57 +08:00
def clean_temp(target_path: str) -> None:
"""Remove temp directory and files for a given target path."""
2023-09-24 21:36:57 +08:00
temp_directory_path = get_temp_directory_path(target_path)
try:
for p in Path(temp_directory_path).glob("*"):
p.unlink()
os.rmdir(temp_directory_path)
except Exception as e:
print(f"Error cleaning temp directory: {e}")
2023-09-24 21:36:57 +08:00
def has_image_extension(image_path: str) -> bool:
"""Check if a file has an image extension."""
return os.path.splitext(image_path)[1].lower() in [
".png", ".jpg", ".jpeg", ".gif", ".bmp"
]
2023-09-24 21:36:57 +08:00
def is_image(image_path: str) -> bool:
"""Check if a file is an image."""
return has_image_extension(image_path)
2023-09-24 21:36:57 +08:00
def is_video(video_path: str) -> bool:
"""Check if a file is a video."""
return os.path.splitext(video_path)[1].lower() in [
".mp4", ".mkv"
]
2023-09-24 21:36:57 +08:00
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
"""Download files from URLs if they do not exist in the directory."""
import requests
2023-09-24 21:36:57 +08:00
for url in urls:
filename = os.path.basename(url)
file_path = os.path.join(download_directory_path, filename)
if not os.path.exists(file_path):
try:
print(f"Downloading {url}...")
r = requests.get(url, stream=True)
with open(file_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {filename}")
except Exception as e:
print(f"Error downloading {url}: {e}")
2023-09-24 21:36:57 +08:00
def resolve_relative_path(path: str) -> str:
"""Resolve a relative path to an absolute path."""
return os.path.abspath(path)