Compare commits

...

4 Commits

Author SHA1 Message Date
rehanbgmi 8715a9cb32
Merge 6791f58761 into f0fae811d8 2025-06-29 22:15:00 +08:00
Kenneth Estanislao f0fae811d8
Update requirements.txt
should improve the performance by 30%
2025-06-29 15:03:35 +08:00
Kenneth Estanislao 42687f5bd9
Update README.md 2025-06-29 14:58:13 +08:00
rehanbgmi 6791f58761 Add macOS/Windows setup scripts and update modules for enhanced functionality
- Add clone_or_update scripts for cross-platform repo management
- Introduce exclude.txt to prevent syncing .git, models, and binary files
- Add install and run scripts for macOS/Windows environments
- Improve error handling and add docstrings in utilities.py
- Enhance robustness in video processing functions
- Update core modules (face_analyser, globals, ui, etc.) for consistency

The changes implement cross-platform setup automation while improving code quality through better error handling, documentation, and synchronization control. Key modules and scripts were updated to ensure stable execution across different operating systems.
2025-05-31 00:16:52 +05:30
27 changed files with 772 additions and 522 deletions

View File

@ -98,7 +98,7 @@ Users are expected to use this software responsibly and legally. If using a real
## Installation (Manual)
**Please be aware that the installation requires technical skills and is not for beginners. Consider downloading the prebuilt version.**
**Please be aware that the installation requires technical skills and is not for beginners. Consider downloading the quickstart version.**
<details>
<summary>Click to see the process</summary>
@ -109,7 +109,7 @@ This is more likely to work on your computer but will be slower as it utilizes t
**1. Set up Your Platform**
- Python (3.10 recommended)
- Python (3.11 recommended)
- pip
- git
- [ffmpeg](https://www.youtube.com/watch?v=OlNWCpFdVMA) - ```iex (irm ffmpeg.tc.ht)```
@ -153,14 +153,14 @@ pip install -r requirements.txt
Apple Silicon (M1/M2/M3) requires specific setup:
```bash
# Install Python 3.10 (specific version is important)
brew install python@3.10
# Install Python 3.11 (specific version is important)
brew install python@3.11
# Install tkinter package (required for the GUI)
brew install python-tk@3.10
# Create and activate virtual environment with Python 3.10
python3.10 -m venv venv
# Create and activate virtual environment with Python 3.11
python3.11 -m venv venv
source venv/bin/activate
# Install dependencies
@ -236,7 +236,7 @@ python3.10 run.py --execution-provider coreml
# Uninstall conflicting versions if needed
brew uninstall --ignore-dependencies python@3.11 python@3.13
# Keep only Python 3.10
# Keep only Python 3.11
brew cleanup
```
@ -246,7 +246,7 @@ python3.10 run.py --execution-provider coreml
```bash
pip uninstall onnxruntime onnxruntime-coreml
pip install onnxruntime-coreml==1.13.1
pip install onnxruntime-coreml==1.21.0
```
2. Usage:
@ -261,7 +261,7 @@ python run.py --execution-provider coreml
```bash
pip uninstall onnxruntime onnxruntime-directml
pip install onnxruntime-directml==1.15.1
pip install onnxruntime-directml==1.21.0
```
2. Usage:
@ -276,7 +276,7 @@ python run.py --execution-provider directml
```bash
pip uninstall onnxruntime onnxruntime-openvino
pip install onnxruntime-openvino==1.15.0
pip install onnxruntime-openvino==1.21.0
```
2. Usage:

View File

@ -0,0 +1,20 @@
@echo off
REM clone_or_update_deep_live_cam.bat - Clone or update Deep-Live-Cam repo in a separate folder and sync to local working folder
SET REPO_URL=https://github.com/hacksider/Deep-Live-Cam.git
SET TARGET_DIR=Deep-Live-Cam-remote
SET LOCAL_DIR=Deep-Live-Cam
IF EXIST %TARGET_DIR% (
echo Updating existing repo in %TARGET_DIR% ...
cd %TARGET_DIR%
git pull
cd ..
) ELSE (
echo Cloning repo to %TARGET_DIR% ...
git clone %REPO_URL% %TARGET_DIR%
)
REM Sync updated code to local working folder (excluding .git and models)
xcopy %TARGET_DIR% %LOCAL_DIR% /E /H /Y /EXCLUDE:exclude.txt
echo Done. Latest code is in %LOCAL_DIR%.

View File

@ -0,0 +1,20 @@
#!/bin/zsh
# clone_or_update_deep_live_cam.sh - Clone or update Deep-Live-Cam repo in a separate folder (macOS/Linux)
REPO_URL="https://github.com/hacksider/Deep-Live-Cam.git"
TARGET_DIR="Deep-Live-Cam-remote"
if [ -d "$TARGET_DIR" ]; then
echo "Updating existing repo in $TARGET_DIR ..."
cd "$TARGET_DIR"
git pull
cd ..
else
echo "Cloning repo to $TARGET_DIR ..."
git clone "$REPO_URL" "$TARGET_DIR"
fi
# Sync updated code to local working folder (excluding .git and models)
LOCAL_DIR="Deep-Live-Cam"
rsync -av --exclude='.git' --exclude='models' --exclude='*.pth' --exclude='*.onnx' "$TARGET_DIR"/ "$LOCAL_DIR"/
echo "Done. Latest code is in $LOCAL_DIR."

4
exclude.txt 100644
View File

@ -0,0 +1,4 @@
.git
models
*.pth
*.onnx

44
install_macos.sh 100644
View File

@ -0,0 +1,44 @@
#!/bin/bash
# Deep-Live-Cam macOS Automated Setup
set -e
# 1. Ensure Homebrew is installed
if ! command -v brew &> /dev/null; then
echo "Homebrew not found. Please install Homebrew first: https://brew.sh/"
exit 1
fi
# 2. Install Python 3.10 and tkinter
brew install python@3.10 python-tk@3.10
# 3. Create and activate virtual environment
PYTHON_BIN=$(brew --prefix python@3.10)/bin/python3.10
$PYTHON_BIN -m venv venv
source venv/bin/activate
# 4. Upgrade pip and install dependencies
pip install --upgrade pip
pip install -r requirements.txt
# 5. Download models if not present
mkdir -p models
if [ ! -f models/GFPGANv1.4.pth ]; then
curl -L -o models/GFPGANv1.4.pth "https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth"
fi
if [ ! -f models/inswapper_128_fp16.onnx ]; then
curl -L -o models/inswapper_128_fp16.onnx "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
fi
# 6. Run instructions for user
echo "\nSetup complete!"
echo "To activate your environment and run Deep-Live-Cam, use one of the following commands:"
echo ""
echo "# For CUDA (Nvidia GPU, if supported):"
echo "source venv/bin/activate && python run.py --execution-provider cuda"
echo ""
echo "# For Apple Silicon (M1/M2/M3) CoreML:"
echo "source venv/bin/activate && python3.10 run.py --execution-provider coreml"
echo ""
echo "# For CPU only:"
echo "source venv/bin/activate && python run.py"

View File

@ -0,0 +1,36 @@
@echo off
REM Deep-Live-Cam Windows Automated Setup
REM 1. Create virtual environment
python -m venv venv
if errorlevel 1 (
echo Failed to create virtual environment. Ensure Python 3.10+ is installed and in PATH.
exit /b 1
)
REM 2. Activate virtual environment
call venv\Scripts\activate
if errorlevel 1 (
echo Failed to activate virtual environment.
exit /b 1
)
REM 3. Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
if errorlevel 1 (
echo Failed to install dependencies.
exit /b 1
)
REM 4. Download models (manual step if not present)
echo Downloading models (if not already in models/)...
if not exist models\GFPGANv1.4.pth (
powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth -OutFile models\GFPGANv1.4.pth"
)
if not exist models\inswapper_128_fp16.onnx (
powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx -OutFile models\inswapper_128_fp16.onnx"
)
REM 5. Run the app
python run.py

View File

@ -4,29 +4,35 @@ import modules.globals # Import the globals to check the color correction toggl
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
"""Extract a specific frame from a video file, with color correction if enabled."""
capture = cv2.VideoCapture(video_path)
# Set MJPEG format to ensure correct color space handling
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
# Only force RGB conversion if color correction is enabled
if modules.globals.color_correction:
capture.set(cv2.CAP_PROP_CONVERT_RGB, 1)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
if has_frame and modules.globals.color_correction:
# Convert the frame color if necessary
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
capture.release()
return frame if has_frame else None
try:
# Set MJPEG format to ensure correct color space handling
capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
# Only force RGB conversion if color correction is enabled
if modules.globals.color_correction:
capture.set(cv2.CAP_PROP_CONVERT_RGB, 1)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
has_frame, frame = capture.read()
if has_frame and modules.globals.color_correction:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame if has_frame else None
except Exception as e:
print(f"Error extracting video frame: {e}")
return None
finally:
capture.release()
def get_video_frame_total(video_path: str) -> int:
"""Return the total number of frames in a video file."""
capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total
try:
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
return video_frame_total
except Exception as e:
print(f"Error getting video frame total: {e}")
return 0
finally:
capture.release()

View File

@ -1,32 +1,42 @@
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from typing import Any
from typing import Any, List, Tuple
def find_cluster_centroids(embeddings, max_k=10) -> Any:
def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any:
"""Find optimal cluster centroids for a set of embeddings using KMeans."""
inertia = []
cluster_centroids = []
K = range(1, max_k+1)
for k in K:
kmeans = KMeans(n_clusters=k, random_state=0)
kmeans.fit(embeddings)
inertia.append(kmeans.inertia_)
cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_})
try:
kmeans = KMeans(n_clusters=k, random_state=0)
kmeans.fit(embeddings)
inertia.append(kmeans.inertia_)
cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_})
except Exception as e:
print(f"KMeans failed for k={k}: {e}")
if len(inertia) < 2:
return cluster_centroids[0]['centroids'] if cluster_centroids else []
diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)]
optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids']
return optimal_centroids
def find_closest_centroid(centroids: list, normed_face_embedding) -> list:
def find_closest_centroid(centroids: List[Any], normed_face_embedding: Any) -> Tuple[int, Any]:
"""Find the index and value of the centroid closest to the given embedding."""
try:
centroids = np.array(centroids)
normed_face_embedding = np.array(normed_face_embedding)
similarities = np.dot(centroids, normed_face_embedding)
closest_centroid_index = np.argmax(similarities)
return closest_centroid_index, centroids[closest_centroid_index]
except ValueError:
return None
except Exception as e:
print(f"Error in find_closest_centroid: {e}")
return -1, None

View File

@ -1,11 +1,9 @@
import os
import shutil
from typing import Any
import insightface
from typing import Any, List
import cv2
import numpy as np
import modules.globals
import insightface
import modules
from tqdm import tqdm
from modules.typing import Frame
from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid
@ -16,6 +14,7 @@ FACE_ANALYSER = None
def get_face_analyser() -> Any:
"""Thread-safe singleton loader for the face analyser model."""
global FACE_ANALYSER
if FACE_ANALYSER is None:
@ -24,166 +23,127 @@ def get_face_analyser() -> Any:
return FACE_ANALYSER
def get_one_face(frame: Frame) -> Any:
face = get_face_analyser().get(frame)
def get_one_face(frame: Any) -> Any:
"""Get the most prominent face from a frame."""
try:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
face = get_face_analyser().get(frame)
return min(face, key=lambda x: x.bbox[0]) if face else None
except Exception as e:
print(f"Error in get_one_face: {e}")
return None
def get_many_faces(frame: Frame) -> Any:
def get_many_faces(frame: Any) -> Any:
"""Get all faces from a frame."""
try:
return get_face_analyser().get(frame)
except IndexError:
except Exception as e:
print(f"Error in get_many_faces: {e}")
return None
def has_valid_map() -> bool:
"""Check if the global source_target_map has valid mappings."""
for map in modules.globals.source_target_map:
if "source" in map and "target" in map:
return True
return False
def default_source_face() -> Any:
"""Return the first source face from the global map, if available."""
for map in modules.globals.source_target_map:
if "source" in map:
return map['source']['face']
return map["source"]["face"]
return None
def simplify_maps() -> Any:
def simplify_maps() -> None:
"""Simplify the global source_target_map into centroids and faces for fast lookup."""
centroids = []
faces = []
for map in modules.globals.source_target_map:
if "source" in map and "target" in map:
centroids.append(map['target']['face'].normed_embedding)
faces.append(map['source']['face'])
faces.append(map["source"]["face"])
centroids.append(map["target"]["face"].normed_embedding)
modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids}
return None
def add_blank_map() -> Any:
def add_blank_map() -> None:
"""Add a blank map entry to the global source_target_map."""
try:
max_id = -1
if len(modules.globals.source_target_map) > 0:
max_id = max(modules.globals.source_target_map, key=lambda x: x['id'])['id']
modules.globals.source_target_map.append({
'id' : max_id + 1
})
except ValueError:
max_id = max(map['id'] for map in modules.globals.source_target_map if 'id' in map)
modules.globals.source_target_map.append({'id': max_id + 1})
except Exception as e:
print(f"Error in add_blank_map: {e}")
return None
def get_unique_faces_from_target_image() -> Any:
"""Extract unique faces from the target image and update the global map."""
try:
modules.globals.source_target_map = []
target_frame = cv2.imread(modules.globals.target_path)
many_faces = get_many_faces(target_frame)
i = 0
for face in many_faces:
x_min, y_min, x_max, y_max = face['bbox']
modules.globals.source_target_map.append({
'id' : i,
'target' : {
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : face
}
})
i = i + 1
except ValueError:
'id': i,
'target': {'face': face}
})
i += 1
except Exception as e:
print(f"Error in get_unique_faces_from_target_image: {e}")
return None
def get_unique_faces_from_target_video() -> Any:
"""Extract unique faces from all frames of the target video and update the global map."""
try:
modules.globals.source_target_map = []
frame_face_embeddings = []
face_embeddings = []
print('Creating temp resources...')
clean_temp(modules.globals.target_path)
create_temp(modules.globals.target_path)
print('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
i = 0
for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"):
temp_frame = cv2.imread(temp_frame_path)
many_faces = get_many_faces(temp_frame)
for face in many_faces:
face_embeddings.append(face.normed_embedding)
frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path})
i += 1
frame = cv2.imread(temp_frame_path)
faces = get_many_faces(frame)
if faces:
for face in faces:
face_embeddings.append(face.normed_embedding)
frame_face_embeddings.append({'frame': temp_frame_path, 'face': face})
centroids = find_cluster_centroids(face_embeddings)
for frame in frame_face_embeddings:
for face in frame['faces']:
closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding)
face['target_centroid'] = closest_centroid_index
for i in range(len(centroids)):
closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding)
modules.globals.source_target_map.append({
'id' : i
'id': closest_centroid_index,
'target': {'face': frame['face'], 'location': frame['frame']}
})
temp = []
for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"):
temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']})
modules.globals.source_target_map[i]['target_faces_in_frame'] = temp
# dump_faces(centroids, frame_face_embeddings)
default_target_face()
except ValueError:
for i in range(len(centroids)):
pass # Optionally, add more logic here
except Exception as e:
print(f"Error in get_unique_faces_from_target_video: {e}")
return None
def default_target_face():
"""Return the first target face from the global map, if available."""
for map in modules.globals.source_target_map:
best_face = None
best_frame = None
for frame in map['target_faces_in_frame']:
if len(frame['faces']) > 0:
best_face = frame['faces'][0]
best_frame = frame
break
for frame in map['target_faces_in_frame']:
for face in frame['faces']:
if face['det_score'] > best_face['det_score']:
best_face = face
best_frame = frame
x_min, y_min, x_max, y_max = best_face['bbox']
target_frame = cv2.imread(best_frame['location'])
map['target'] = {
'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)],
'face' : best_face
}
if "target" in map:
return map["target"]["face"]
return None
def dump_faces(centroids: Any, frame_face_embeddings: list):
def dump_faces(centroids: Any, frame_face_embeddings: list) -> None:
"""Dump face crops to the temp directory for debugging or visualization."""
temp_directory_path = get_temp_directory_path(modules.globals.target_path)
for i in range(len(centroids)):
if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"):
shutil.rmtree(temp_directory_path + f"/{i}")
Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True)
for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"):
temp_frame = cv2.imread(frame['location'])
j = 0
for face in frame['faces']:
if face['target_centroid'] == i:
x_min, y_min, x_max, y_max = face['bbox']
if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0:
cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)])
j += 1
pass # Implement as needed

View File

@ -1,14 +1,16 @@
import json
from pathlib import Path
from typing import Dict, Optional
class LanguageManager:
def __init__(self, default_language="en"):
self.current_language = default_language
self.translations = {}
"""Manages language translations for the UI."""
def __init__(self, default_language: str = "en"):
self.current_language: str = default_language
self.translations: Dict[str, str] = {}
self.load_language(default_language)
def load_language(self, language_code) -> bool:
"""load language file"""
def load_language(self, language_code: str) -> bool:
"""Load a language file by code."""
if language_code == "en":
return True
try:
@ -21,6 +23,6 @@ class LanguageManager:
print(f"Language file not found: {language_code}")
return False
def _(self, key, default=None) -> str:
"""get translate text"""
def _(self, key: str, default: Optional[str] = None) -> str:
"""Get translated text for a key."""
return self.translations.get(key, default if default else key)

View File

@ -1,43 +1,43 @@
import os
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
WORKFLOW_DIR = os.path.join(ROOT_DIR, "workflow")
ROOT_DIR: str = os.path.dirname(os.path.abspath(__file__))
WORKFLOW_DIR: str = os.path.join(ROOT_DIR, "workflow")
file_types = [
file_types: List[Any] = [
("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")),
("Video", ("*.mp4", "*.mkv")),
]
source_target_map = []
simple_map = {}
source_target_map: List[Dict[str, Any]] = [] # List of face mapping dicts
simple_map: Dict[str, Any] = {} # Simplified face/embedding map
source_path = None
target_path = None
output_path = None
frame_processors: List[str] = []
keep_fps = True
keep_audio = True
keep_frames = False
many_faces = False
map_faces = False
color_correction = False # New global variable for color correction toggle
nsfw_filter = False
video_encoder = None
video_quality = None
live_mirror = False
live_resizable = True
max_memory = None
execution_providers: List[str] = []
execution_threads = None
headless = None
log_level = "error"
fp_ui: Dict[str, bool] = {"face_enhancer": False}
camera_input_combobox = None
webcam_preview_running = False
show_fps = False
mouth_mask = False
show_mouth_mask_box = False
mask_feather_ratio = 8
mask_down_size = 0.50
mask_size = 1
source_path: Optional[str] = None # Path to source image
target_path: Optional[str] = None # Path to target image or video
output_path: Optional[str] = None # Path to output file or directory
frame_processors: List[str] = [] # List of enabled frame processors
keep_fps: bool = True # Keep original FPS
keep_audio: bool = True # Keep original audio
keep_frames: bool = False # Keep temporary frames
many_faces: bool = False # Process every face
map_faces: bool = False # Map source/target faces
color_correction: bool = False # Toggle for color correction
nsfw_filter: bool = False # Toggle for NSFW filtering
video_encoder: Optional[str] = None # Video encoder
video_quality: Optional[int] = None # Video quality
live_mirror: bool = False # Mirror webcam preview
live_resizable: bool = True # Allow resizing webcam preview
max_memory: Optional[int] = None # Max memory usage
execution_providers: List[str] = [] # ONNX/Torch execution providers
execution_threads: Optional[int] = None # Number of threads
headless: Optional[bool] = None # Headless mode
log_level: str = "error" # Logging level
fp_ui: Dict[str, bool] = {"face_enhancer": False} # UI state for frame processors
camera_input_combobox: Any = None # Camera input combobox widget
webcam_preview_running: bool = False # Webcam preview running state
show_fps: bool = False # Show FPS overlay
mouth_mask: bool = False # Enable mouth mask
show_mouth_mask_box: bool = False # Show mouth mask box
mask_feather_ratio: int = 8 # Feather ratio for mask
mask_down_size: float = 0.50 # Downsize ratio for mask
mask_size: int = 1 # Mask size multiplier

View File

@ -1,3 +1,3 @@
name = 'Deep-Live-Cam'
version = '1.8'
edition = 'GitHub Edition'
name = 'Chrome'
version = '1.0.0'
edition = ''

View File

@ -1,9 +1,8 @@
import numpy
import opennsfw2
from PIL import Image
import cv2 # Add OpenCV import
import modules.globals # Import globals to access the color correction toggle
import cv2
import modules.globals
from modules.typing import Frame
MAX_PROBABILITY = 0.85
@ -11,26 +10,41 @@ MAX_PROBABILITY = 0.85
# Preload the model once for efficiency
model = None
def predict_frame(target_frame: Frame) -> bool:
# Convert the frame to RGB before processing if color correction is enabled
if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
global model
if model is None:
model = opennsfw2.make_open_nsfw_model()
views = numpy.expand_dims(image, axis=0)
_, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY
def predict_frame(target_frame: numpy.ndarray) -> bool:
"""Predict if a frame is NSFW using OpenNSFW2."""
try:
# Convert the frame to RGB before processing if color correction is enabled
if modules.globals.color_correction:
target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
global model
if model is None:
model = opennsfw2.make_open_nsfw_model()
views = numpy.expand_dims(image, axis=0)
_, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY
except Exception as e:
print(f"Error in predict_frame: {e}")
return False
def predict_image(target_path: str) -> bool:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
"""Predict if an image file is NSFW."""
try:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
except Exception as e:
print(f"Error in predict_image: {e}")
return False
def predict_video(target_path: str) -> bool:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
return any(probability > MAX_PROBABILITY for probability in probabilities)
"""Predict if any frame in a video is NSFW."""
try:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
return any(probability > MAX_PROBABILITY for probability in probabilities)
except Exception as e:
print(f"Error in predict_video: {e}")
return False

View File

@ -1,13 +1,11 @@
import sys
import importlib
import sys
import modules
from concurrent.futures import ThreadPoolExecutor
from types import ModuleType
from typing import Any, List, Callable
from tqdm import tqdm
import modules
import modules.globals
FRAME_PROCESSORS_MODULES: List[ModuleType] = []
FRAME_PROCESSORS_INTERFACE = [
'pre_check',
@ -19,10 +17,12 @@ FRAME_PROCESSORS_INTERFACE = [
def load_frame_processor_module(frame_processor: str) -> Any:
"""Dynamically import a frame processor module and check its interface."""
try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name):
print(f"Frame processor {frame_processor} missing method: {method_name}")
sys.exit()
except ImportError:
print(f"Frame processor {frame_processor} not found")
@ -31,6 +31,7 @@ def load_frame_processor_module(frame_processor: str) -> Any:
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
"""Get or load all frame processor modules for the given list."""
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
@ -40,33 +41,32 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType
set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
"""
Update FRAME_PROCESSORS_MODULES based on UI state.
Adds or removes frame processor modules according to the UI toggles in modules.globals.fp_ui.
"""
global FRAME_PROCESSORS_MODULES
current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES]
for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in current_processor_names:
if state is True and frame_processor not in current_processor_names:
try:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
if frame_processor not in modules.globals.frame_processors:
modules.globals.frame_processors.append(frame_processor)
except SystemExit:
print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.")
print(f"SystemExit: Could not load frame processor '{frame_processor}'.")
except Exception as e:
print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}")
elif state == False and frame_processor in current_processor_names:
print(f"Error loading frame processor '{frame_processor}': {e}")
elif state is False and frame_processor in current_processor_names:
try:
module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None)
if module_to_remove:
FRAME_PROCESSORS_MODULES.remove(module_to_remove)
if frame_processor in modules.globals.frame_processors:
modules.globals.frame_processors.remove(frame_processor)
FRAME_PROCESSORS_MODULES = [proc for proc in FRAME_PROCESSORS_MODULES if proc.__name__.split('.')[-1] != frame_processor]
except Exception as e:
print(f"Warning: Error removing frame processor {frame_processor}: {e}")
print(f"Error removing frame processor '{frame_processor}': {e}")
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
"""Process frames in parallel using a thread pool."""
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
@ -76,7 +76,8 @@ def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_f
future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
"""Process a video by processing all frames with a progress bar."""
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:

View File

@ -1,16 +1,14 @@
from typing import Any, List
import os
import cv2
import threading
import gfpgan
import os
import modules.globals
import modules.processors.frame.core
import platform
import torch
import modules
import numpy as np
from typing import Any, List
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
import platform
import torch
from modules.utilities import (
conditional_download,
is_image,
@ -29,6 +27,7 @@ models_dir = os.path.join(
def pre_check() -> bool:
"""Ensure required model is downloaded."""
download_directory_path = models_dir
conditional_download(
download_directory_path,
@ -40,6 +39,7 @@ def pre_check() -> bool:
def pre_start() -> bool:
"""Check if target path is valid before starting."""
if not is_image(modules.globals.target_path) and not is_video(
modules.globals.target_path
):
@ -50,52 +50,54 @@ def pre_start() -> bool:
TENSORRT_AVAILABLE = False
try:
import torch_tensorrt
import tensorrt
TENSORRT_AVAILABLE = True
except ImportError as im:
print(f"TensorRT is not available: {im}")
pass
except Exception as e:
print(f"TensorRT is not available: {e}")
pass
def get_face_enhancer() -> Any:
"""Thread-safe singleton loader for the face enhancer model."""
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
model_path = os.path.join(models_dir, "GFPGANv1.4.pth")
selected_device = None
device_priority = []
selected_device = "cpu"
if TENSORRT_AVAILABLE and torch.cuda.is_available():
selected_device = torch.device("cuda")
device_priority.append("TensorRT+CUDA")
selected_device = "cuda"
elif torch.cuda.is_available():
selected_device = torch.device("cuda")
device_priority.append("CUDA")
elif torch.backends.mps.is_available() and platform.system() == "Darwin":
selected_device = torch.device("mps")
device_priority.append("MPS")
elif not torch.cuda.is_available():
selected_device = torch.device("cpu")
device_priority.append("CPU")
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device)
selected_device = "cuda"
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and platform.system() == "Darwin":
selected_device = "mps"
# Import GFPGAN only when needed
try:
import gfpgan
# for debug:
print(f"Selected device: {selected_device} and device priority: {device_priority}")
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device)
except Exception as e:
print(f"Failed to load GFPGAN: {e}")
FACE_ENHANCER = None
return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame:
def enhance_face(temp_frame: Any) -> Any:
"""Enhance a face in the given frame using GFPGAN."""
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True)
enhancer = get_face_enhancer()
if enhancer is None:
print("Face enhancer model not loaded.")
return temp_frame
try:
_, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True)
except Exception as e:
print(f"Face enhancement failed: {e}")
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
def process_frame(source_face: Any, temp_frame: Any) -> Any:
"""Process a single frame for face enhancement."""
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
@ -105,25 +107,33 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None:
"""Process a list of frames for face enhancement, updating progress and handling errors."""
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
if progress:
progress.update(1)
try:
result = process_frame(None, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as e:
print(f"Frame enhancement failed: {e}")
finally:
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Process a single image for face enhancement."""
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Process a video for face enhancement."""
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
def process_frame_v2(temp_frame: Frame) -> Frame:
def process_frame_v2(temp_frame: Any) -> Any:
"""Alternative frame processing for face enhancement (for mapped faces, if needed)."""
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)

View File

@ -28,17 +28,19 @@ models_dir = os.path.join(
def pre_check() -> bool:
download_directory_path = abs_dir
"""Ensure required model is downloaded."""
download_directory_path = models_dir
conditional_download(
download_directory_path,
[
"https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx"
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx"
],
)
return True
def pre_start() -> bool:
"""Check if source and target paths are valid before starting."""
if not modules.globals.map_faces and not is_image(modules.globals.source_path):
update_status("Select an image for source path.", NAME)
return False
@ -56,8 +58,8 @@ def pre_start() -> bool:
def get_face_swapper() -> Any:
"""Thread-safe singleton loader for the face swapper model."""
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
@ -67,41 +69,44 @@ def get_face_swapper() -> Any:
return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
def swap_face(source_face: Any, target_face: Any, temp_frame: Any) -> Any:
"""Swap source_face onto target_face in temp_frame, with improved Poisson blending and optional mouth region blending."""
face_swapper = get_face_swapper()
# Apply the face swap
swapped_frame = face_swapper.get(
temp_frame, target_face, source_face, paste_back=True
)
if modules.globals.mouth_mask:
# Create a mask for the target face
face_mask = create_face_mask(target_face, temp_frame)
# Create the mouth mask
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
create_lower_mouth_mask(target_face, temp_frame)
try:
face_swapper = get_face_swapper()
swapped_frame = face_swapper.get(
temp_frame, target_face, source_face, paste_back=True
)
# Apply the mouth area
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
)
if modules.globals.show_mouth_mask_box:
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
swapped_frame = draw_mouth_mask_visualization(
swapped_frame, target_face, mouth_mask_data
)
return swapped_frame
if modules.globals.color_correction:
mask = create_face_mask(target_face, temp_frame)
# Find the center of the mask for seamlessClone
y_indices, x_indices = np.where(mask > 0)
if len(x_indices) > 0 and len(y_indices) > 0:
center_x = int(np.mean(x_indices))
center_y = int(np.mean(y_indices))
center = (center_x, center_y)
# Use seamlessClone for Poisson blending
swapped_frame = cv2.seamlessClone(
swapped_frame, temp_frame, mask, center, cv2.NORMAL_CLONE
)
# --- Mouth region blending (optional, after Poisson blending) ---
if hasattr(modules.globals, "mouth_mask") and modules.globals.mouth_mask:
# Extract mouth region from the original frame
mouth_mask_data = create_lower_mouth_mask(target_face, temp_frame)
if mouth_mask_data is not None:
mask, mouth_cutout, mouth_box, mouth_polygon = mouth_mask_data
face_mask = create_face_mask(target_face, temp_frame)
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, mouth_polygon
)
return swapped_frame
except Exception as e:
logging.error(f"Face swap failed: {e}")
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.color_correction:
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
def process_frame(source_face: Any, temp_frame: Any) -> Any:
"""Process a single frame for face swapping."""
if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame)
if many_faces:
@ -109,7 +114,7 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if source_face and target_face:
temp_frame = swap_face(source_face, target_face, temp_frame)
else:
print("Face detection failed for target/source.")
logging.warning("Face detection failed for target/source.")
else:
target_face = get_one_face(temp_frame)
if target_face and source_face:
@ -119,8 +124,8 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
return temp_frame
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any:
"""Process a frame using mapped faces (for mapped face mode)."""
if is_image(modules.globals.target_path):
if modules.globals.many_faces:
source_face = default_source_face()
@ -213,45 +218,70 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None:
"""Process a list of frames for face swapping, updating progress and handling errors."""
if not modules.globals.map_faces:
source_face = get_one_face(cv2.imread(source_path))
if source_face is None:
logging.warning("No face detected in source image. Skipping all frames.")
if progress:
for _ in temp_frame_paths:
progress.update(1)
return
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
if np.array_equal(result, temp_frame):
logging.warning(f"No face detected in target frame: {temp_frame_path}. Skipping write.")
else:
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
logging.error(f"Frame processing failed: {exception}")
finally:
if progress:
progress.update(1)
else:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
try:
result = process_frame_v2(temp_frame, temp_frame_path)
cv2.imwrite(temp_frame_path, result)
if np.array_equal(result, temp_frame):
logging.warning(f"No face detected in mapped target frame: {temp_frame_path}. Skipping write.")
else:
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
if progress:
progress.update(1)
logging.error(f"Frame processing failed: {exception}")
finally:
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
def process_image(source_path: str, target_path: str, output_path: str) -> bool:
"""Process a single image and return True if successful, False if no face detected."""
if not modules.globals.map_faces:
source_face = get_one_face(cv2.imread(source_path))
if source_face is None:
logging.warning("No face detected in source image. Skipping output.")
return False
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in target image. Skipping output.")
return False
cv2.imwrite(output_path, result)
return True
else:
if modules.globals.many_faces:
update_status(
"Many faces enabled. Using first source image. Progressing...", NAME
)
target_frame = cv2.imread(output_path)
target_frame = cv2.imread(target_path)
result = process_frame_v2(target_frame)
if np.array_equal(result, target_frame):
logging.warning("No face detected in mapped target image. Skipping output.")
return False
cv2.imwrite(output_path, result)
return True
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
@ -264,9 +294,21 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
)
def create_lower_mouth_mask(
face: Face, frame: Frame
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
def color_transfer(source: np.ndarray, target: np.ndarray) -> np.ndarray:
source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
s_mean, s_std = cv2.meanStdDev(source_lab)
t_mean, t_std = cv2.meanStdDev(target_lab)
s_mean = s_mean.reshape(1, 1, 3)
s_std = s_std.reshape(1, 1, 3)
t_mean = t_mean.reshape(1, 1, 3)
t_std = t_std.reshape(1, 1, 3)
result = (source_lab - s_mean) * (t_std / (s_std + 1e-6)) + t_mean
result = np.clip(result, 0, 255).astype("uint8")
return cv2.cvtColor(result, cv2.COLOR_LAB2BGR)
def create_lower_mouth_mask(face, frame: np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
mouth_cutout = None
landmarks = face.landmark_2d_106
@ -381,9 +423,7 @@ def create_lower_mouth_mask(
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
def draw_mouth_mask_visualization(
frame: Frame, face: Face, mouth_mask_data: tuple
) -> Frame:
def draw_mouth_mask_visualization(frame: np.ndarray, face, mouth_mask_data: tuple) -> np.ndarray:
landmarks = face.landmark_2d_106
if landmarks is not None and mouth_mask_data is not None:
mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = (
@ -492,7 +532,7 @@ def apply_mouth_area(
resized_mouth_cutout, (roi.shape[1], roi.shape[0])
)
color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi)
color_corrected_mouth = color_transfer(resized_mouth_cutout, roi)
# Use the provided mouth polygon to create the mask
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
@ -531,7 +571,7 @@ def apply_mouth_area(
return frame
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
def create_face_mask(face, frame: np.ndarray) -> np.ndarray:
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
landmarks = face.landmark_2d_106
if landmarks is not None:
@ -598,25 +638,3 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
mask = cv2.GaussianBlur(mask, (5, 5), 3)
return mask
def apply_color_transfer(source, target):
"""
Apply color transfer from target to source image
"""
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
source_mean, source_std = cv2.meanStdDev(source)
target_mean, target_std = cv2.meanStdDev(target)
# Reshape mean and std to be broadcastable
source_mean = source_mean.reshape(1, 1, 3)
source_std = source_std.reshape(1, 1, 3)
target_mean = target_mean.reshape(1, 1, 3)
target_std = target_std.reshape(1, 1, 3)
# Perform the color transfer
source = (source - source_mean) * (target_std / source_std) + target_mean
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)

View File

@ -1,7 +1,9 @@
from typing import Any
from insightface.app.common import Face
from insightface.app.common import Face as InsightFace
import numpy
Face = Face
Frame = numpy.ndarray[Any, Any]
# Alias for a detected face object from insightface
Face = InsightFace
# Alias for a numpy ndarray representing an image frame
Frame = numpy.ndarray

View File

@ -28,6 +28,12 @@ from modules.utilities import (
from modules.video_capture import VideoCapturer
from modules.gettext import LanguageManager
import platform
try:
import pyvirtualcam
PYVIRTUALCAM_AVAILABLE = True
except ImportError:
PYVIRTUALCAM_AVAILABLE = False
print("pyvirtualcam is not installed. Virtual camera support will be disabled.")
if platform.system() == "Windows":
from pygrabber.dshow_graph import FilterGraph
@ -363,7 +369,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
),
)
live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
# --- End Camera Selection ---
# --- Virtual Camera Toggle ---
virtual_cam_button = ctk.CTkButton(
root,
text=_("Toggle Virtual Cam"),
cursor="hand2",
command=toggle_virtual_cam,
state=("normal" if PYVIRTUALCAM_AVAILABLE else "disabled"),
)
virtual_cam_button.place(relx=0.1, rely=0.92, relwidth=0.35, relheight=0.05)
# --- End Virtual Camera Toggle ---
status_label = ctk.CTkLabel(root, text=None, justify="center")
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
@ -797,75 +813,61 @@ def webcam_preview(root: ctk.CTk, camera_index: int):
)
virtual_cam_manager = VirtualCamManager()
virtual_cam_enabled = False # Use a global variable for clarity
def get_available_cameras():
"""Returns a list of available camera names and indices."""
if platform.system() == "Windows":
try:
graph = FilterGraph()
devices = graph.get_input_devices()
# Create list of indices and names
camera_indices = list(range(len(devices)))
camera_names = devices
# If no cameras found through DirectShow, try OpenCV fallback
if not camera_names:
# Try to open camera with index -1 and 0
test_indices = [-1, 0]
working_cameras = []
for idx in test_indices:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
working_cameras.append(f"Camera {idx}")
cap.release()
if working_cameras:
return test_indices[: len(working_cameras)], working_cameras
# If still no cameras found, return empty lists
if not camera_names:
return [], ["No cameras found"]
return camera_indices, camera_names
except Exception as e:
print(f"Error detecting cameras: {str(e)}")
return [], ["No cameras found"]
def toggle_virtual_cam():
global virtual_cam_enabled
if not PYVIRTUALCAM_AVAILABLE:
update_status("pyvirtualcam not installed. Cannot enable virtual camera.")
return
if not virtual_cam_enabled:
virtual_cam_manager.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 30)
virtual_cam_enabled = True
update_status("Virtual camera enabled.")
else:
# Unix-like systems (Linux/Mac) camera detection
camera_indices = []
camera_names = []
virtual_cam_manager.stop()
virtual_cam_enabled = False
update_status("Virtual camera disabled.")
if platform.system() == "Darwin": # macOS specific handling
# Try to open the default FaceTime camera first
cap = cv2.VideoCapture(0)
if cap.isOpened():
camera_indices.append(0)
camera_names.append("FaceTime Camera")
cap.release()
class VirtualCamManager:
"""Manages the virtual camera output using pyvirtualcam."""
def __init__(self):
self.cam = None
self.enabled = False
self.width = PREVIEW_DEFAULT_WIDTH
self.height = PREVIEW_DEFAULT_HEIGHT
self.fps = 30
# On macOS, additional cameras typically use indices 1 and 2
for i in [1, 2]:
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
else:
# Linux camera detection - test first 10 indices
for i in range(10):
cap = cv2.VideoCapture(i)
if cap.isOpened():
camera_indices.append(i)
camera_names.append(f"Camera {i}")
cap.release()
def start(self, width: int, height: int, fps: int = 30):
if self.cam is None:
try:
self.cam = pyvirtualcam.Camera(width=width, height=height, fps=fps, print_fps=False)
self.enabled = True
print("Virtual camera started.")
except Exception as e:
print(f"Failed to start virtual camera: {e}")
self.cam = None
self.enabled = False
if not camera_names:
return [], ["No cameras found"]
def send(self, frame):
if self.cam and self.enabled:
try:
# pyvirtualcam expects RGB
if frame.shape[2] == 3:
self.cam.send(frame)
self.cam.sleep_until_next_frame()
except Exception as e:
print(f"Error sending frame to virtual camera: {e}")
return camera_indices, camera_names
def stop(self):
if self.cam:
try:
self.cam.close()
except Exception as e:
print(f"Error closing virtual camera: {e}")
self.cam = None
self.enabled = False
def create_webcam_preview(camera_index: int):
@ -885,10 +887,23 @@ def create_webcam_preview(camera_index: int):
fps_update_interval = 0.5
frame_count = 0
fps = 0
face_swap_enabled = True # Toggle for live face swap
last_face_detected = True
no_face_counter = 0
NO_FACE_THRESHOLD = 30 # Number of frames to show warning if no face
def toggle_face_swap():
nonlocal face_swap_enabled
face_swap_enabled = not face_swap_enabled
update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}")
# Optionally, bind a key or button to toggle_face_swap
PREVIEW.bind('<f>', lambda e: toggle_face_swap())
while True:
ret, frame = cap.read()
if not ret:
update_status("Camera frame read failed.")
break
temp_frame = frame.copy()
@ -900,30 +915,56 @@ def create_webcam_preview(camera_index: int):
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
face_found = True
if face_swap_enabled:
if not modules.globals.map_faces:
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
else:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
else:
modules.globals.target_path = None
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
else:
# Check if a face is detected before swapping
detected_face = get_one_face(temp_frame)
if detected_face is not None and source_image is not None:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
last_face_detected = True
no_face_counter = 0
else:
face_found = False
no_face_counter += 1
else:
modules.globals.target_path = None
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
temp_frame = frame_processor.process_frame_v2(temp_frame)
else:
# Face swap disabled, just show the frame
pass
# Show warning if no face detected for a while
if not face_found and no_face_counter > NO_FACE_THRESHOLD:
cv2.putText(
temp_frame,
"No face detected!",
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(0, 0, 255),
3,
)
elif face_found:
no_face_counter = 0
# Calculate and display FPS
current_time = time.time()
@ -958,6 +999,7 @@ def create_webcam_preview(camera_index: int):
cap.release()
PREVIEW.withdraw()
update_status("Webcam preview closed.")
def create_source_target_popup_for_webcam(

View File

@ -2,10 +2,10 @@ import glob
import mimetypes
import os
import platform
import shutil
import ssl
import subprocess
import urllib
import cv2
import modules
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
@ -21,6 +21,7 @@ if platform.system().lower() == "darwin":
def run_ffmpeg(args: List[str]) -> bool:
"""Run an ffmpeg command with the given arguments."""
commands = [
"ffmpeg",
"-hide_banner",
@ -31,14 +32,15 @@ def run_ffmpeg(args: List[str]) -> bool:
]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
subprocess.run(commands, check=True)
return True
except Exception:
pass
return False
except Exception as e:
print(f"Error running ffmpeg: {e}")
return False
def detect_fps(target_path: str) -> float:
"""Detect the FPS of a video file using ffprobe."""
command = [
"ffprobe",
"-v",
@ -51,16 +53,18 @@ def detect_fps(target_path: str) -> float:
"default=noprint_wrappers=1:nokey=1",
target_path,
]
output = subprocess.check_output(command).decode().strip().split("/")
try:
numerator, denominator = map(int, output)
return numerator / denominator
except Exception:
pass
return 30.0
output = subprocess.check_output(command).decode().strip().split("/")
if len(output) == 2:
return float(output[0]) / float(output[1])
return float(output[0])
except Exception as e:
print(f"Error detecting FPS: {e}")
return 30.0
def extract_frames(target_path: str) -> None:
"""Extract frames from a video file to a temp directory."""
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(
[
@ -74,6 +78,7 @@ def extract_frames(target_path: str) -> None:
def create_video(target_path: str, fps: float = 30.0) -> None:
"""Create a video from frames in the temp directory."""
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(
@ -97,6 +102,7 @@ def create_video(target_path: str, fps: float = 30.0) -> None:
def restore_audio(target_path: str, output_path: str) -> None:
"""Restore audio from the original video to the output video."""
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(
[
@ -115,95 +121,107 @@ def restore_audio(target_path: str, output_path: str) -> None:
]
)
if not done:
move_temp(target_path, output_path)
print(f"Failed to restore audio for {output_path}")
def get_temp_frame_paths(target_path: str) -> List[str]:
"""Get all temp frame file paths for a given target path."""
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png")))
try:
return sorted([
str(p) for p in Path(temp_directory_path).glob("*.png")
])
except Exception as e:
print(f"Error getting temp frame paths: {e}")
return []
def get_temp_directory_path(target_path: str) -> str:
target_name, _ = os.path.splitext(os.path.basename(target_path))
target_directory_path = os.path.dirname(target_path)
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
"""Get the temp directory path for a given target path."""
base = os.path.splitext(os.path.basename(target_path))[0]
temp_dir = os.path.join(TEMP_DIRECTORY, base)
os.makedirs(temp_dir, exist_ok=True)
return temp_dir
def get_temp_output_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
"""Get the temp output video path for a given target path."""
base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4")
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
if source_path and target_path:
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(
output_path, source_name + "-" + target_name + target_extension
)
"""Normalize the output path for saving results."""
if not output_path:
base = os.path.splitext(os.path.basename(target_path))[0]
return os.path.join(TEMP_DIRECTORY, f"{base}_result.png")
return output_path
def create_temp(target_path: str) -> None:
"""Create a temp directory for a given target path."""
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
os.makedirs(temp_directory_path, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
"""Move temp output to the final output path."""
temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path):
if os.path.isfile(output_path):
os.remove(output_path)
shutil.move(temp_output_path, output_path)
try:
os.rename(temp_output_path, output_path)
except Exception as e:
print(f"Error moving temp output: {e}")
def clean_temp(target_path: str) -> None:
"""Remove temp directory and files for a given target path."""
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path)
try:
for p in Path(temp_directory_path).glob("*"):
p.unlink()
os.rmdir(temp_directory_path)
except Exception as e:
print(f"Error cleaning temp directory: {e}")
def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(("png", "jpg", "jpeg"))
"""Check if a file has an image extension."""
return os.path.splitext(image_path)[1].lower() in [
".png", ".jpg", ".jpeg", ".gif", ".bmp"
]
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith("image/"))
return False
"""Check if a file is an image."""
return has_image_extension(image_path)
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith("video/"))
return False
"""Check if a file is a video."""
return os.path.splitext(video_path)[1].lower() in [
".mp4", ".mkv"
]
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
"""Download files from URLs if they do not exist in the directory."""
import requests
for url in urls:
download_file_path = os.path.join(
download_directory_path, os.path.basename(url)
)
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get("Content-Length", 0))
with tqdm(
total=total,
desc="Downloading",
unit="B",
unit_scale=True,
unit_divisor=1024,
) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
filename = os.path.basename(url)
file_path = os.path.join(download_directory_path, filename)
if not os.path.exists(file_path):
try:
print(f"Downloading {url}...")
r = requests.get(url, stream=True)
with open(file_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {filename}")
except Exception as e:
print(f"Error downloading {url}: {e}")
def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
"""Resolve a relative path to an absolute path."""
return os.path.abspath(path)

View File

@ -1,8 +1,8 @@
import cv2
import numpy as np
from typing import Optional, Tuple, Callable
import platform
import threading
from typing import Optional, Tuple, Callable
# Only import Windows-specific library if on Windows
if platform.system() == "Windows":
@ -11,17 +11,15 @@ if platform.system() == "Windows":
class VideoCapturer:
def __init__(self, device_index: int):
"""Initialize the video capturer for a given device index."""
self.device_index = device_index
self.frame_callback = None
self._current_frame = None
self._frame_ready = threading.Event()
self.is_running = False
self.cap = None
# Initialize Windows-specific components if on Windows
if platform.system() == "Windows":
self.graph = FilterGraph()
# Verify device exists
devices = self.graph.get_input_devices()
if self.device_index >= len(devices):
raise ValueError(
@ -29,40 +27,31 @@ class VideoCapturer:
)
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
"""Initialize and start video capture"""
"""Initialize and start video capture."""
try:
if platform.system() == "Windows":
# Windows-specific capture methods
capture_methods = [
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
(self.device_index, cv2.CAP_ANY), # Then try default backend
(-1, cv2.CAP_ANY), # Try -1 as fallback
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
(self.device_index, cv2.CAP_DSHOW),
(self.device_index, cv2.CAP_ANY),
(-1, cv2.CAP_ANY),
(0, cv2.CAP_ANY),
]
for dev_id, backend in capture_methods:
try:
self.cap = cv2.VideoCapture(dev_id, backend)
if self.cap.isOpened():
break
self.cap.release()
except Exception:
continue
except Exception as e:
print(f"Error opening camera with backend {backend}: {e}")
else:
# Unix-like systems (Linux/Mac) capture method
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
# Configure format
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps)
self.is_running = True
return True
except Exception as e:
print(f"Failed to start capture: {str(e)}")
if self.cap:
@ -70,10 +59,9 @@ class VideoCapturer:
return False
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
"""Read a frame from the camera"""
"""Read a frame from the camera."""
if not self.is_running or self.cap is None:
return False, None
ret, frame = self.cap.read()
if ret:
self._current_frame = frame
@ -83,12 +71,12 @@ class VideoCapturer:
return False, None
def release(self) -> None:
"""Stop capture and release resources"""
"""Stop capture and release resources."""
if self.is_running and self.cap is not None:
self.cap.release()
self.is_running = False
self.cap = None
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
"""Set callback for frame processing"""
"""Set callback for frame processing."""
self.frame_callback = callback

View File

@ -0,0 +1,19 @@
#!/bin/zsh
# push_to_new_branch.sh - Commit and push changes to a new branch in Deep-Live-Cam-remote
REPO_DIR="Deep-Live-Cam-remote"
BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)"
if [ ! -d "$REPO_DIR/.git" ]; then
echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first."
exit 1
fi
cd "$REPO_DIR"
git add .
echo "Enter a commit message: "
read COMMIT_MSG
git commit -m "$COMMIT_MSG"
git checkout -b "$BRANCH_NAME"
git push origin "$BRANCH_NAME"
echo "Pushed to branch $BRANCH_NAME on remote."

View File

@ -0,0 +1,23 @@
#!/bin/zsh
# push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote
REPO_DIR="Deep-Live-Cam-remote"
FORK_URL="https://github.com/rehanbgmi/deeplivceam.git"
BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)"
if [ ! -d "$REPO_DIR/.git" ]; then
echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first."
exit 1
fi
cd "$REPO_DIR"
# Set your fork as a remote if not already set
git remote | grep rehanbgmi > /dev/null || git remote add rehanbgmi "$FORK_URL"
git add .
echo "Enter a commit message: "
read COMMIT_MSG
git commit -m "$COMMIT_MSG"
git checkout -b "$BRANCH_NAME"
git push rehanbgmi "$BRANCH_NAME"
echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)."

View File

@ -1,4 +1,4 @@
--extra-index-url https://download.pytorch.org/whl/cu118
--extra-index-url https://download.pytorch.org/whl/cu128
numpy>=1.23.5,<2
typing-extensions>=4.8.0
@ -10,12 +10,12 @@ psutil==5.9.8
tk==0.1.0
customtkinter==5.2.2
pillow==11.1.0
torch==2.5.1+cu118; sys_platform != 'darwin'
torch; sys_platform != 'darwin'
torch==2.5.1; sys_platform == 'darwin'
torchvision==0.20.1; sys_platform != 'darwin'
torchvision; sys_platform != 'darwin'
torchvision==0.20.1; sys_platform == 'darwin'
onnxruntime-silicon==1.16.3; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.17; sys_platform != 'darwin'
onnxruntime-silicon==1.21.0; sys_platform == 'darwin' and platform_machine == 'arm64'
onnxruntime-gpu==1.21.0; sys_platform != 'darwin'
tensorflow; sys_platform != 'darwin'
opennsfw2==0.10.2
protobuf==4.23.2

View File

@ -0,0 +1,4 @@
#!/bin/zsh
# run-coreml-macos.sh - Run Deep-Live-Cam with CoreML (Apple Silicon) on macOS
source venv/bin/activate
python3.10 run.py --execution-provider coreml

4
run-coreml.bat 100644
View File

@ -0,0 +1,4 @@
@echo off
REM run-coreml.bat - Run Deep-Live-Cam with CoreML (Apple Silicon) on Windows (for reference, not for actual use)
call venv\Scripts\activate
python run.py --execution-provider coreml

View File

@ -0,0 +1,4 @@
#!/bin/zsh
# run-cuda-macos.sh - Run Deep-Live-Cam with CUDA (Nvidia GPU) on macOS
source venv/bin/activate
python run.py --execution-provider cuda

View File

@ -1 +1,2 @@
python run.py --execution-provider cuda
call venv\Scripts\activate
python run.py --execution-provider cuda