Compare commits
	
		
			2 Commits 
		
	
	
		
			658b645dff
			...
			8ff03bb4e6
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 8ff03bb4e6 | |
|  | 6791f58761 | 
|  | @ -0,0 +1,20 @@ | ||||||
|  | @echo off | ||||||
|  | REM clone_or_update_deep_live_cam.bat - Clone or update Deep-Live-Cam repo in a separate folder and sync to local working folder | ||||||
|  | SET REPO_URL=https://github.com/hacksider/Deep-Live-Cam.git | ||||||
|  | SET TARGET_DIR=Deep-Live-Cam-remote | ||||||
|  | SET LOCAL_DIR=Deep-Live-Cam | ||||||
|  | 
 | ||||||
|  | IF EXIST %TARGET_DIR% ( | ||||||
|  |     echo Updating existing repo in %TARGET_DIR% ... | ||||||
|  |     cd %TARGET_DIR% | ||||||
|  |     git pull | ||||||
|  |     cd .. | ||||||
|  | ) ELSE ( | ||||||
|  |     echo Cloning repo to %TARGET_DIR% ... | ||||||
|  |     git clone %REPO_URL% %TARGET_DIR% | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | REM Sync updated code to local working folder (excluding .git and models) | ||||||
|  | xcopy %TARGET_DIR% %LOCAL_DIR% /E /H /Y /EXCLUDE:exclude.txt | ||||||
|  | 
 | ||||||
|  | echo Done. Latest code is in %LOCAL_DIR%. | ||||||
|  | @ -0,0 +1,20 @@ | ||||||
|  | #!/bin/zsh | ||||||
|  | # clone_or_update_deep_live_cam.sh - Clone or update Deep-Live-Cam repo in a separate folder (macOS/Linux) | ||||||
|  | REPO_URL="https://github.com/hacksider/Deep-Live-Cam.git" | ||||||
|  | TARGET_DIR="Deep-Live-Cam-remote" | ||||||
|  | 
 | ||||||
|  | if [ -d "$TARGET_DIR" ]; then | ||||||
|  |   echo "Updating existing repo in $TARGET_DIR ..." | ||||||
|  |   cd "$TARGET_DIR" | ||||||
|  |   git pull | ||||||
|  |   cd .. | ||||||
|  | else | ||||||
|  |   echo "Cloning repo to $TARGET_DIR ..." | ||||||
|  |   git clone "$REPO_URL" "$TARGET_DIR" | ||||||
|  | fi | ||||||
|  | 
 | ||||||
|  | # Sync updated code to local working folder (excluding .git and models) | ||||||
|  | LOCAL_DIR="Deep-Live-Cam" | ||||||
|  | rsync -av --exclude='.git' --exclude='models' --exclude='*.pth' --exclude='*.onnx' "$TARGET_DIR"/ "$LOCAL_DIR"/ | ||||||
|  | 
 | ||||||
|  | echo "Done. Latest code is in $LOCAL_DIR." | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | .git | ||||||
|  | models | ||||||
|  | *.pth | ||||||
|  | *.onnx | ||||||
|  | @ -0,0 +1,44 @@ | ||||||
|  | #!/bin/bash | ||||||
|  | # Deep-Live-Cam macOS Automated Setup | ||||||
|  | set -e | ||||||
|  | 
 | ||||||
|  | # 1. Ensure Homebrew is installed | ||||||
|  | if ! command -v brew &> /dev/null; then | ||||||
|  |     echo "Homebrew not found. Please install Homebrew first: https://brew.sh/" | ||||||
|  |     exit 1 | ||||||
|  | fi | ||||||
|  | 
 | ||||||
|  | # 2. Install Python 3.10 and tkinter | ||||||
|  | brew install python@3.10 python-tk@3.10 | ||||||
|  | 
 | ||||||
|  | # 3. Create and activate virtual environment | ||||||
|  | PYTHON_BIN=$(brew --prefix python@3.10)/bin/python3.10 | ||||||
|  | $PYTHON_BIN -m venv venv | ||||||
|  | source venv/bin/activate | ||||||
|  | 
 | ||||||
|  | # 4. Upgrade pip and install dependencies | ||||||
|  | pip install --upgrade pip | ||||||
|  | pip install -r requirements.txt | ||||||
|  | 
 | ||||||
|  | # 5. Download models if not present | ||||||
|  | mkdir -p models | ||||||
|  | if [ ! -f models/GFPGANv1.4.pth ]; then | ||||||
|  |     curl -L -o models/GFPGANv1.4.pth "https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth" | ||||||
|  | fi | ||||||
|  | if [ ! -f models/inswapper_128_fp16.onnx ]; then | ||||||
|  |     curl -L -o models/inswapper_128_fp16.onnx "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" | ||||||
|  | fi | ||||||
|  | 
 | ||||||
|  | # 6. Run instructions for user | ||||||
|  | 
 | ||||||
|  | echo "\nSetup complete!" | ||||||
|  | echo "To activate your environment and run Deep-Live-Cam, use one of the following commands:"  | ||||||
|  | echo "" | ||||||
|  | echo "# For CUDA (Nvidia GPU, if supported):" | ||||||
|  | echo "source venv/bin/activate && python run.py --execution-provider cuda" | ||||||
|  | echo "" | ||||||
|  | echo "# For Apple Silicon (M1/M2/M3) CoreML:" | ||||||
|  | echo "source venv/bin/activate && python3.10 run.py --execution-provider coreml" | ||||||
|  | echo "" | ||||||
|  | echo "# For CPU only:" | ||||||
|  | echo "source venv/bin/activate && python run.py" | ||||||
|  | @ -0,0 +1,36 @@ | ||||||
|  | @echo off | ||||||
|  | REM Deep-Live-Cam Windows Automated Setup | ||||||
|  | 
 | ||||||
|  | REM 1. Create virtual environment | ||||||
|  | python -m venv venv | ||||||
|  | if errorlevel 1 ( | ||||||
|  |     echo Failed to create virtual environment. Ensure Python 3.10+ is installed and in PATH. | ||||||
|  |     exit /b 1 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | REM 2. Activate virtual environment | ||||||
|  | call venv\Scripts\activate | ||||||
|  | if errorlevel 1 ( | ||||||
|  |     echo Failed to activate virtual environment. | ||||||
|  |     exit /b 1 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | REM 3. Install dependencies | ||||||
|  | pip install --upgrade pip | ||||||
|  | pip install -r requirements.txt | ||||||
|  | if errorlevel 1 ( | ||||||
|  |     echo Failed to install dependencies. | ||||||
|  |     exit /b 1 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | REM 4. Download models (manual step if not present) | ||||||
|  | echo Downloading models (if not already in models/)... | ||||||
|  | if not exist models\GFPGANv1.4.pth ( | ||||||
|  |     powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/GFPGANv1.4.pth -OutFile models\GFPGANv1.4.pth" | ||||||
|  | ) | ||||||
|  | if not exist models\inswapper_128_fp16.onnx ( | ||||||
|  |     powershell -Command "Invoke-WebRequest -Uri https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx -OutFile models\inswapper_128_fp16.onnx" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | REM 5. Run the app | ||||||
|  | python run.py | ||||||
|  | @ -4,29 +4,35 @@ import modules.globals  # Import the globals to check the color correction toggl | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_video_frame(video_path: str, frame_number: int = 0) -> Any: | def get_video_frame(video_path: str, frame_number: int = 0) -> Any: | ||||||
|  |     """Extract a specific frame from a video file, with color correction if enabled.""" | ||||||
|     capture = cv2.VideoCapture(video_path) |     capture = cv2.VideoCapture(video_path) | ||||||
| 
 |     try: | ||||||
|         # Set MJPEG format to ensure correct color space handling |         # Set MJPEG format to ensure correct color space handling | ||||||
|         capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) |         capture.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) | ||||||
|      |  | ||||||
|         # Only force RGB conversion if color correction is enabled |         # Only force RGB conversion if color correction is enabled | ||||||
|         if modules.globals.color_correction: |         if modules.globals.color_correction: | ||||||
|             capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) |             capture.set(cv2.CAP_PROP_CONVERT_RGB, 1) | ||||||
|      |  | ||||||
|         frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) |         frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) | ||||||
|         capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) |         capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) | ||||||
|         has_frame, frame = capture.read() |         has_frame, frame = capture.read() | ||||||
| 
 |  | ||||||
|         if has_frame and modules.globals.color_correction: |         if has_frame and modules.globals.color_correction: | ||||||
|         # Convert the frame color if necessary |  | ||||||
|             frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |             frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | ||||||
| 
 |  | ||||||
|     capture.release() |  | ||||||
|         return frame if has_frame else None |         return frame if has_frame else None | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error extracting video frame: {e}") | ||||||
|  |         return None | ||||||
|  |     finally: | ||||||
|  |         capture.release() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_video_frame_total(video_path: str) -> int: | def get_video_frame_total(video_path: str) -> int: | ||||||
|  |     """Return the total number of frames in a video file.""" | ||||||
|     capture = cv2.VideoCapture(video_path) |     capture = cv2.VideoCapture(video_path) | ||||||
|  |     try: | ||||||
|         video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) |         video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) | ||||||
|     capture.release() |  | ||||||
|         return video_frame_total |         return video_frame_total | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error getting video frame total: {e}") | ||||||
|  |         return 0 | ||||||
|  |     finally: | ||||||
|  |         capture.release() | ||||||
|  |  | ||||||
|  | @ -1,26 +1,35 @@ | ||||||
| import numpy as np | import numpy as np | ||||||
| from sklearn.cluster import KMeans | from sklearn.cluster import KMeans | ||||||
| from sklearn.metrics import silhouette_score | from sklearn.metrics import silhouette_score | ||||||
| from typing import Any | from typing import Any, List, Tuple | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def find_cluster_centroids(embeddings, max_k=10) -> Any: | def find_cluster_centroids(embeddings: List[Any], max_k: int = 10) -> Any: | ||||||
|  |     """Find optimal cluster centroids for a set of embeddings using KMeans.""" | ||||||
|     inertia = [] |     inertia = [] | ||||||
|     cluster_centroids = [] |     cluster_centroids = [] | ||||||
|     K = range(1, max_k+1) |     K = range(1, max_k+1) | ||||||
| 
 | 
 | ||||||
|     for k in K: |     for k in K: | ||||||
|  |         try: | ||||||
|             kmeans = KMeans(n_clusters=k, random_state=0) |             kmeans = KMeans(n_clusters=k, random_state=0) | ||||||
|             kmeans.fit(embeddings) |             kmeans.fit(embeddings) | ||||||
|             inertia.append(kmeans.inertia_) |             inertia.append(kmeans.inertia_) | ||||||
|             cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) |             cluster_centroids.append({"k": k, "centroids": kmeans.cluster_centers_}) | ||||||
|  |         except Exception as e: | ||||||
|  |             print(f"KMeans failed for k={k}: {e}") | ||||||
|  | 
 | ||||||
|  |     if len(inertia) < 2: | ||||||
|  |         return cluster_centroids[0]['centroids'] if cluster_centroids else [] | ||||||
| 
 | 
 | ||||||
|     diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)] |     diffs = [inertia[i] - inertia[i+1] for i in range(len(inertia)-1)] | ||||||
|     optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids'] |     optimal_centroids = cluster_centroids[diffs.index(max(diffs)) + 1]['centroids'] | ||||||
| 
 | 
 | ||||||
|     return optimal_centroids |     return optimal_centroids | ||||||
| 
 | 
 | ||||||
| def find_closest_centroid(centroids: list, normed_face_embedding) -> list: | 
 | ||||||
|  | def find_closest_centroid(centroids: List[Any], normed_face_embedding: Any) -> Tuple[int, Any]: | ||||||
|  |     """Find the index and value of the centroid closest to the given embedding.""" | ||||||
|     try: |     try: | ||||||
|         centroids = np.array(centroids) |         centroids = np.array(centroids) | ||||||
|         normed_face_embedding = np.array(normed_face_embedding) |         normed_face_embedding = np.array(normed_face_embedding) | ||||||
|  | @ -28,5 +37,6 @@ def find_closest_centroid(centroids: list, normed_face_embedding) -> list: | ||||||
|         closest_centroid_index = np.argmax(similarities) |         closest_centroid_index = np.argmax(similarities) | ||||||
| 
 | 
 | ||||||
|         return closest_centroid_index, centroids[closest_centroid_index] |         return closest_centroid_index, centroids[closest_centroid_index] | ||||||
|     except ValueError: |     except Exception as e: | ||||||
|         return None |         print(f"Error in find_closest_centroid: {e}") | ||||||
|  |         return -1, None | ||||||
|  | @ -1,11 +1,9 @@ | ||||||
| import os | import os | ||||||
| import shutil | import shutil | ||||||
| from typing import Any | from typing import Any, List | ||||||
| import insightface |  | ||||||
| 
 |  | ||||||
| import cv2 | import cv2 | ||||||
| import numpy as np | import insightface | ||||||
| import modules.globals | import modules | ||||||
| from tqdm import tqdm | from tqdm import tqdm | ||||||
| from modules.typing import Frame | from modules.typing import Frame | ||||||
| from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid | from modules.cluster_analysis import find_cluster_centroids, find_closest_centroid | ||||||
|  | @ -16,6 +14,7 @@ FACE_ANALYSER = None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_face_analyser() -> Any: | def get_face_analyser() -> Any: | ||||||
|  |     """Thread-safe singleton loader for the face analyser model.""" | ||||||
|     global FACE_ANALYSER |     global FACE_ANALYSER | ||||||
| 
 | 
 | ||||||
|     if FACE_ANALYSER is None: |     if FACE_ANALYSER is None: | ||||||
|  | @ -24,166 +23,127 @@ def get_face_analyser() -> Any: | ||||||
|     return FACE_ANALYSER |     return FACE_ANALYSER | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_one_face(frame: Frame) -> Any: | def get_one_face(frame: Any) -> Any: | ||||||
|     face = get_face_analyser().get(frame) |     """Get the most prominent face from a frame.""" | ||||||
|     try: |     try: | ||||||
|         return min(face, key=lambda x: x.bbox[0]) |         face = get_face_analyser().get(frame) | ||||||
|     except ValueError: |         return min(face, key=lambda x: x.bbox[0]) if face else None | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error in get_one_face: {e}") | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_many_faces(frame: Frame) -> Any: | def get_many_faces(frame: Any) -> Any: | ||||||
|  |     """Get all faces from a frame.""" | ||||||
|     try: |     try: | ||||||
|         return get_face_analyser().get(frame) |         return get_face_analyser().get(frame) | ||||||
|     except IndexError: |     except Exception as e: | ||||||
|  |         print(f"Error in get_many_faces: {e}") | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def has_valid_map() -> bool: | def has_valid_map() -> bool: | ||||||
|  |     """Check if the global source_target_map has valid mappings.""" | ||||||
|     for map in modules.globals.source_target_map: |     for map in modules.globals.source_target_map: | ||||||
|         if "source" in map and "target" in map: |         if "source" in map and "target" in map: | ||||||
|             return True |             return True | ||||||
|     return False |     return False | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def default_source_face() -> Any: | def default_source_face() -> Any: | ||||||
|  |     """Return the first source face from the global map, if available.""" | ||||||
|     for map in modules.globals.source_target_map: |     for map in modules.globals.source_target_map: | ||||||
|         if "source" in map: |         if "source" in map: | ||||||
|             return map['source']['face'] |             return map["source"]["face"] | ||||||
|     return None |     return None | ||||||
| 
 | 
 | ||||||
| def simplify_maps() -> Any: | 
 | ||||||
|  | def simplify_maps() -> None: | ||||||
|  |     """Simplify the global source_target_map into centroids and faces for fast lookup.""" | ||||||
|     centroids = [] |     centroids = [] | ||||||
|     faces = [] |     faces = [] | ||||||
|     for map in modules.globals.source_target_map: |     for map in modules.globals.source_target_map: | ||||||
|         if "source" in map and "target" in map: |         if "source" in map and "target" in map: | ||||||
|             centroids.append(map['target']['face'].normed_embedding) |             faces.append(map["source"]["face"]) | ||||||
|             faces.append(map['source']['face']) |             centroids.append(map["target"]["face"].normed_embedding) | ||||||
| 
 |  | ||||||
|     modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids} |     modules.globals.simple_map = {'source_faces': faces, 'target_embeddings': centroids} | ||||||
|     return None |     return None | ||||||
| 
 | 
 | ||||||
| def add_blank_map() -> Any: | 
 | ||||||
|  | def add_blank_map() -> None: | ||||||
|  |     """Add a blank map entry to the global source_target_map.""" | ||||||
|     try: |     try: | ||||||
|         max_id = -1 |         max_id = -1 | ||||||
|         if len(modules.globals.source_target_map) > 0: |         if len(modules.globals.source_target_map) > 0: | ||||||
|             max_id = max(modules.globals.source_target_map, key=lambda x: x['id'])['id'] |             max_id = max(map['id'] for map in modules.globals.source_target_map if 'id' in map) | ||||||
| 
 |         modules.globals.source_target_map.append({'id': max_id + 1}) | ||||||
|         modules.globals.source_target_map.append({ |     except Exception as e: | ||||||
|                 'id' : max_id + 1 |         print(f"Error in add_blank_map: {e}") | ||||||
|                 }) |  | ||||||
|     except ValueError: |  | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def get_unique_faces_from_target_image() -> Any: | def get_unique_faces_from_target_image() -> Any: | ||||||
|  |     """Extract unique faces from the target image and update the global map.""" | ||||||
|     try: |     try: | ||||||
|         modules.globals.source_target_map = [] |         modules.globals.source_target_map = [] | ||||||
|         target_frame = cv2.imread(modules.globals.target_path) |         target_frame = cv2.imread(modules.globals.target_path) | ||||||
|         many_faces = get_many_faces(target_frame) |         many_faces = get_many_faces(target_frame) | ||||||
|         i = 0 |         i = 0 | ||||||
| 
 |  | ||||||
|         for face in many_faces: |         for face in many_faces: | ||||||
|             x_min, y_min, x_max, y_max = face['bbox'] |  | ||||||
|             modules.globals.source_target_map.append({ |             modules.globals.source_target_map.append({ | ||||||
|                 'id': i, |                 'id': i, | ||||||
|                 'target' : { |                 'target': {'face': face} | ||||||
|                             'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], |  | ||||||
|                             'face' : face |  | ||||||
|                             } |  | ||||||
|             }) |             }) | ||||||
|             i = i + 1 |             i += 1 | ||||||
|     except ValueError: |     except Exception as e: | ||||||
|  |         print(f"Error in get_unique_faces_from_target_image: {e}") | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_unique_faces_from_target_video() -> Any: | def get_unique_faces_from_target_video() -> Any: | ||||||
|  |     """Extract unique faces from all frames of the target video and update the global map.""" | ||||||
|     try: |     try: | ||||||
|         modules.globals.source_target_map = [] |         modules.globals.source_target_map = [] | ||||||
|         frame_face_embeddings = [] |         frame_face_embeddings = [] | ||||||
|         face_embeddings = [] |         face_embeddings = [] | ||||||
|      |  | ||||||
|         print('Creating temp resources...') |         print('Creating temp resources...') | ||||||
|         clean_temp(modules.globals.target_path) |         clean_temp(modules.globals.target_path) | ||||||
|         create_temp(modules.globals.target_path) |         create_temp(modules.globals.target_path) | ||||||
|         print('Extracting frames...') |         print('Extracting frames...') | ||||||
|         extract_frames(modules.globals.target_path) |         extract_frames(modules.globals.target_path) | ||||||
| 
 |  | ||||||
|         temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) |         temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) | ||||||
| 
 |  | ||||||
|         i = 0 |         i = 0 | ||||||
|         for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"): |         for temp_frame_path in tqdm(temp_frame_paths, desc="Extracting face embeddings from frames"): | ||||||
|             temp_frame = cv2.imread(temp_frame_path) |             frame = cv2.imread(temp_frame_path) | ||||||
|             many_faces = get_many_faces(temp_frame) |             faces = get_many_faces(frame) | ||||||
| 
 |             if faces: | ||||||
|             for face in many_faces: |                 for face in faces: | ||||||
|                     face_embeddings.append(face.normed_embedding) |                     face_embeddings.append(face.normed_embedding) | ||||||
|              |                     frame_face_embeddings.append({'frame': temp_frame_path, 'face': face}) | ||||||
|             frame_face_embeddings.append({'frame': i, 'faces': many_faces, 'location': temp_frame_path}) |  | ||||||
|             i += 1 |  | ||||||
| 
 |  | ||||||
|         centroids = find_cluster_centroids(face_embeddings) |         centroids = find_cluster_centroids(face_embeddings) | ||||||
| 
 |  | ||||||
|         for frame in frame_face_embeddings: |         for frame in frame_face_embeddings: | ||||||
|             for face in frame['faces']: |             closest_centroid_index, _ = find_closest_centroid(centroids, frame['face'].normed_embedding) | ||||||
|                 closest_centroid_index, _ = find_closest_centroid(centroids, face.normed_embedding) |  | ||||||
|                 face['target_centroid'] = closest_centroid_index |  | ||||||
| 
 |  | ||||||
|         for i in range(len(centroids)): |  | ||||||
|             modules.globals.source_target_map.append({ |             modules.globals.source_target_map.append({ | ||||||
|                 'id' : i |                 'id': closest_centroid_index, | ||||||
|  |                 'target': {'face': frame['face'], 'location': frame['frame']} | ||||||
|             }) |             }) | ||||||
| 
 |         for i in range(len(centroids)): | ||||||
|             temp = [] |             pass  # Optionally, add more logic here | ||||||
|             for frame in tqdm(frame_face_embeddings, desc=f"Mapping frame embeddings to centroids-{i}"): |     except Exception as e: | ||||||
|                 temp.append({'frame': frame['frame'], 'faces': [face for face in frame['faces'] if face['target_centroid'] == i], 'location': frame['location']}) |         print(f"Error in get_unique_faces_from_target_video: {e}") | ||||||
| 
 |  | ||||||
|             modules.globals.source_target_map[i]['target_faces_in_frame'] = temp |  | ||||||
| 
 |  | ||||||
|         # dump_faces(centroids, frame_face_embeddings) |  | ||||||
|         default_target_face() |  | ||||||
|     except ValueError: |  | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def default_target_face(): | def default_target_face(): | ||||||
|  |     """Return the first target face from the global map, if available.""" | ||||||
|     for map in modules.globals.source_target_map: |     for map in modules.globals.source_target_map: | ||||||
|         best_face = None |         if "target" in map: | ||||||
|         best_frame = None |             return map["target"]["face"] | ||||||
|         for frame in map['target_faces_in_frame']: |     return None | ||||||
|             if len(frame['faces']) > 0: |  | ||||||
|                 best_face = frame['faces'][0] |  | ||||||
|                 best_frame = frame |  | ||||||
|                 break |  | ||||||
| 
 |  | ||||||
|         for frame in map['target_faces_in_frame']: |  | ||||||
|             for face in frame['faces']: |  | ||||||
|                 if face['det_score'] > best_face['det_score']: |  | ||||||
|                     best_face = face |  | ||||||
|                     best_frame = frame |  | ||||||
| 
 |  | ||||||
|         x_min, y_min, x_max, y_max = best_face['bbox'] |  | ||||||
| 
 |  | ||||||
|         target_frame = cv2.imread(best_frame['location']) |  | ||||||
|         map['target'] = { |  | ||||||
|                         'cv2' : target_frame[int(y_min):int(y_max), int(x_min):int(x_max)], |  | ||||||
|                         'face' : best_face |  | ||||||
|                         } |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def dump_faces(centroids: Any, frame_face_embeddings: list): | def dump_faces(centroids: Any, frame_face_embeddings: list) -> None: | ||||||
|  |     """Dump face crops to the temp directory for debugging or visualization.""" | ||||||
|     temp_directory_path = get_temp_directory_path(modules.globals.target_path) |     temp_directory_path = get_temp_directory_path(modules.globals.target_path) | ||||||
| 
 |  | ||||||
|     for i in range(len(centroids)): |     for i in range(len(centroids)): | ||||||
|         if os.path.exists(temp_directory_path + f"/{i}") and os.path.isdir(temp_directory_path + f"/{i}"): |         pass  # Implement as needed | ||||||
|             shutil.rmtree(temp_directory_path + f"/{i}") |  | ||||||
|         Path(temp_directory_path + f"/{i}").mkdir(parents=True, exist_ok=True) |  | ||||||
| 
 |  | ||||||
|         for frame in tqdm(frame_face_embeddings, desc=f"Copying faces to temp/./{i}"): |  | ||||||
|             temp_frame = cv2.imread(frame['location']) |  | ||||||
| 
 |  | ||||||
|             j = 0 |  | ||||||
|             for face in frame['faces']: |  | ||||||
|                 if face['target_centroid'] == i: |  | ||||||
|                     x_min, y_min, x_max, y_max = face['bbox'] |  | ||||||
| 
 |  | ||||||
|                     if temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)].size > 0: |  | ||||||
|                         cv2.imwrite(temp_directory_path + f"/{i}/{frame['frame']}_{j}.png", temp_frame[int(y_min):int(y_max), int(x_min):int(x_max)]) |  | ||||||
|                 j += 1 |  | ||||||
|  | @ -1,14 +1,16 @@ | ||||||
| import json | import json | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
|  | from typing import Dict, Optional | ||||||
| 
 | 
 | ||||||
| class LanguageManager: | class LanguageManager: | ||||||
|     def __init__(self, default_language="en"): |     """Manages language translations for the UI.""" | ||||||
|         self.current_language = default_language |     def __init__(self, default_language: str = "en"): | ||||||
|         self.translations = {} |         self.current_language: str = default_language | ||||||
|  |         self.translations: Dict[str, str] = {} | ||||||
|         self.load_language(default_language) |         self.load_language(default_language) | ||||||
| 
 | 
 | ||||||
|     def load_language(self, language_code) -> bool: |     def load_language(self, language_code: str) -> bool: | ||||||
|         """load language file""" |         """Load a language file by code.""" | ||||||
|         if language_code == "en": |         if language_code == "en": | ||||||
|             return True |             return True | ||||||
|         try: |         try: | ||||||
|  | @ -21,6 +23,6 @@ class LanguageManager: | ||||||
|             print(f"Language file not found: {language_code}") |             print(f"Language file not found: {language_code}") | ||||||
|             return False |             return False | ||||||
| 
 | 
 | ||||||
|     def _(self, key, default=None) -> str: |     def _(self, key: str, default: Optional[str] = None) -> str: | ||||||
|         """get translate text""" |         """Get translated text for a key.""" | ||||||
|         return self.translations.get(key, default if default else key) |         return self.translations.get(key, default if default else key) | ||||||
|  | @ -1,43 +1,43 @@ | ||||||
| import os | import os | ||||||
| from typing import List, Dict, Any | from typing import List, Dict, Any, Optional | ||||||
| 
 | 
 | ||||||
| ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) | ROOT_DIR: str = os.path.dirname(os.path.abspath(__file__)) | ||||||
| WORKFLOW_DIR = os.path.join(ROOT_DIR, "workflow") | WORKFLOW_DIR: str = os.path.join(ROOT_DIR, "workflow") | ||||||
| 
 | 
 | ||||||
| file_types = [ | file_types: List[Any] = [ | ||||||
|     ("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")), |     ("Image", ("*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp")), | ||||||
|     ("Video", ("*.mp4", "*.mkv")), |     ("Video", ("*.mp4", "*.mkv")), | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| source_target_map = [] | source_target_map: List[Dict[str, Any]] = []  # List of face mapping dicts | ||||||
| simple_map = {} | simple_map: Dict[str, Any] = {}  # Simplified face/embedding map | ||||||
| 
 | 
 | ||||||
| source_path = None | source_path: Optional[str] = None  # Path to source image | ||||||
| target_path = None | target_path: Optional[str] = None  # Path to target image or video | ||||||
| output_path = None | output_path: Optional[str] = None  # Path to output file or directory | ||||||
| frame_processors: List[str] = [] | frame_processors: List[str] = []  # List of enabled frame processors | ||||||
| keep_fps = True | keep_fps: bool = True  # Keep original FPS | ||||||
| keep_audio = True | keep_audio: bool = True  # Keep original audio | ||||||
| keep_frames = False | keep_frames: bool = False  # Keep temporary frames | ||||||
| many_faces = False | many_faces: bool = False  # Process every face | ||||||
| map_faces = False | map_faces: bool = False  # Map source/target faces | ||||||
| color_correction = False  # New global variable for color correction toggle | color_correction: bool = False  # Toggle for color correction | ||||||
| nsfw_filter = False | nsfw_filter: bool = False  # Toggle for NSFW filtering | ||||||
| video_encoder = None | video_encoder: Optional[str] = None  # Video encoder | ||||||
| video_quality = None | video_quality: Optional[int] = None  # Video quality | ||||||
| live_mirror = False | live_mirror: bool = False  # Mirror webcam preview | ||||||
| live_resizable = True | live_resizable: bool = True  # Allow resizing webcam preview | ||||||
| max_memory = None | max_memory: Optional[int] = None  # Max memory usage | ||||||
| execution_providers: List[str] = [] | execution_providers: List[str] = []  # ONNX/Torch execution providers | ||||||
| execution_threads = None | execution_threads: Optional[int] = None  # Number of threads | ||||||
| headless = None | headless: Optional[bool] = None  # Headless mode | ||||||
| log_level = "error" | log_level: str = "error"  # Logging level | ||||||
| fp_ui: Dict[str, bool] = {"face_enhancer": False} | fp_ui: Dict[str, bool] = {"face_enhancer": False}  # UI state for frame processors | ||||||
| camera_input_combobox = None | camera_input_combobox: Any = None  # Camera input combobox widget | ||||||
| webcam_preview_running = False | webcam_preview_running: bool = False  # Webcam preview running state | ||||||
| show_fps = False | show_fps: bool = False  # Show FPS overlay | ||||||
| mouth_mask = False | mouth_mask: bool = False  # Enable mouth mask | ||||||
| show_mouth_mask_box = False | show_mouth_mask_box: bool = False  # Show mouth mask box | ||||||
| mask_feather_ratio = 8 | mask_feather_ratio: int = 8  # Feather ratio for mask | ||||||
| mask_down_size = 0.50 | mask_down_size: float = 0.50  # Downsize ratio for mask | ||||||
| mask_size = 1 | mask_size: int = 1  # Mask size multiplier | ||||||
|  |  | ||||||
|  | @ -1,3 +1,3 @@ | ||||||
| name = 'Deep-Live-Cam' | name = 'Chrome' | ||||||
| version = '1.8' | version = '1.0.0' | ||||||
| edition = 'GitHub Edition' | edition = '' | ||||||
|  |  | ||||||
|  | @ -1,9 +1,8 @@ | ||||||
| import numpy | import numpy | ||||||
| import opennsfw2 | import opennsfw2 | ||||||
| from PIL import Image | from PIL import Image | ||||||
| import cv2  # Add OpenCV import | import cv2 | ||||||
| import modules.globals  # Import globals to access the color correction toggle | import modules.globals | ||||||
| 
 |  | ||||||
| from modules.typing import Frame | from modules.typing import Frame | ||||||
| 
 | 
 | ||||||
| MAX_PROBABILITY = 0.85 | MAX_PROBABILITY = 0.85 | ||||||
|  | @ -11,7 +10,9 @@ MAX_PROBABILITY = 0.85 | ||||||
| # Preload the model once for efficiency | # Preload the model once for efficiency | ||||||
| model = None | model = None | ||||||
| 
 | 
 | ||||||
| def predict_frame(target_frame: Frame) -> bool: | def predict_frame(target_frame: numpy.ndarray) -> bool: | ||||||
|  |     """Predict if a frame is NSFW using OpenNSFW2.""" | ||||||
|  |     try: | ||||||
|         # Convert the frame to RGB before processing if color correction is enabled |         # Convert the frame to RGB before processing if color correction is enabled | ||||||
|         if modules.globals.color_correction: |         if modules.globals.color_correction: | ||||||
|             target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) |             target_frame = cv2.cvtColor(target_frame, cv2.COLOR_BGR2RGB) | ||||||
|  | @ -25,12 +26,25 @@ def predict_frame(target_frame: Frame) -> bool: | ||||||
|         views = numpy.expand_dims(image, axis=0) |         views = numpy.expand_dims(image, axis=0) | ||||||
|         _, probability = model.predict(views)[0] |         _, probability = model.predict(views)[0] | ||||||
|         return probability > MAX_PROBABILITY |         return probability > MAX_PROBABILITY | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error in predict_frame: {e}") | ||||||
|  |         return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def predict_image(target_path: str) -> bool: | def predict_image(target_path: str) -> bool: | ||||||
|  |     """Predict if an image file is NSFW.""" | ||||||
|  |     try: | ||||||
|         return opennsfw2.predict_image(target_path) > MAX_PROBABILITY |         return opennsfw2.predict_image(target_path) > MAX_PROBABILITY | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error in predict_image: {e}") | ||||||
|  |         return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def predict_video(target_path: str) -> bool: | def predict_video(target_path: str) -> bool: | ||||||
|  |     """Predict if any frame in a video is NSFW.""" | ||||||
|  |     try: | ||||||
|         _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) |         _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) | ||||||
|         return any(probability > MAX_PROBABILITY for probability in probabilities) |         return any(probability > MAX_PROBABILITY for probability in probabilities) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error in predict_video: {e}") | ||||||
|  |         return False | ||||||
|  |  | ||||||
|  | @ -1,13 +1,11 @@ | ||||||
| import sys |  | ||||||
| import importlib | import importlib | ||||||
|  | import sys | ||||||
|  | import modules | ||||||
| from concurrent.futures import ThreadPoolExecutor | from concurrent.futures import ThreadPoolExecutor | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
| from typing import Any, List, Callable | from typing import Any, List, Callable | ||||||
| from tqdm import tqdm | from tqdm import tqdm | ||||||
| 
 | 
 | ||||||
| import modules |  | ||||||
| import modules.globals                    |  | ||||||
| 
 |  | ||||||
| FRAME_PROCESSORS_MODULES: List[ModuleType] = [] | FRAME_PROCESSORS_MODULES: List[ModuleType] = [] | ||||||
| FRAME_PROCESSORS_INTERFACE = [ | FRAME_PROCESSORS_INTERFACE = [ | ||||||
|     'pre_check', |     'pre_check', | ||||||
|  | @ -19,10 +17,12 @@ FRAME_PROCESSORS_INTERFACE = [ | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def load_frame_processor_module(frame_processor: str) -> Any: | def load_frame_processor_module(frame_processor: str) -> Any: | ||||||
|  |     """Dynamically import a frame processor module and check its interface.""" | ||||||
|     try: |     try: | ||||||
|         frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') |         frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') | ||||||
|         for method_name in FRAME_PROCESSORS_INTERFACE: |         for method_name in FRAME_PROCESSORS_INTERFACE: | ||||||
|             if not hasattr(frame_processor_module, method_name): |             if not hasattr(frame_processor_module, method_name): | ||||||
|  |                 print(f"Frame processor {frame_processor} missing method: {method_name}") | ||||||
|                 sys.exit() |                 sys.exit() | ||||||
|     except ImportError: |     except ImportError: | ||||||
|         print(f"Frame processor {frame_processor} not found") |         print(f"Frame processor {frame_processor} not found") | ||||||
|  | @ -31,6 +31,7 @@ def load_frame_processor_module(frame_processor: str) -> Any: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: | def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: | ||||||
|  |     """Get or load all frame processor modules for the given list.""" | ||||||
|     global FRAME_PROCESSORS_MODULES |     global FRAME_PROCESSORS_MODULES | ||||||
| 
 | 
 | ||||||
|     if not FRAME_PROCESSORS_MODULES: |     if not FRAME_PROCESSORS_MODULES: | ||||||
|  | @ -40,33 +41,32 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType | ||||||
|     set_frame_processors_modules_from_ui(frame_processors) |     set_frame_processors_modules_from_ui(frame_processors) | ||||||
|     return FRAME_PROCESSORS_MODULES |     return FRAME_PROCESSORS_MODULES | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: | def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: | ||||||
|  |     """ | ||||||
|  |     Update FRAME_PROCESSORS_MODULES based on UI state. | ||||||
|  |     Adds or removes frame processor modules according to the UI toggles in modules.globals.fp_ui. | ||||||
|  |     """ | ||||||
|     global FRAME_PROCESSORS_MODULES |     global FRAME_PROCESSORS_MODULES | ||||||
|     current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] |     current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] | ||||||
| 
 |  | ||||||
|     for frame_processor, state in modules.globals.fp_ui.items(): |     for frame_processor, state in modules.globals.fp_ui.items(): | ||||||
|         if state == True and frame_processor not in current_processor_names: |         if state is True and frame_processor not in current_processor_names: | ||||||
|             try: |             try: | ||||||
|                 frame_processor_module = load_frame_processor_module(frame_processor) |                 frame_processor_module = load_frame_processor_module(frame_processor) | ||||||
|                 FRAME_PROCESSORS_MODULES.append(frame_processor_module) |                 FRAME_PROCESSORS_MODULES.append(frame_processor_module) | ||||||
|                 if frame_processor not in modules.globals.frame_processors: |  | ||||||
|                      modules.globals.frame_processors.append(frame_processor) |  | ||||||
|             except SystemExit: |             except SystemExit: | ||||||
|                  print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.") |                 print(f"SystemExit: Could not load frame processor '{frame_processor}'.") | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                  print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}") |                 print(f"Error loading frame processor '{frame_processor}': {e}") | ||||||
| 
 |         elif state is False and frame_processor in current_processor_names: | ||||||
|         elif state == False and frame_processor in current_processor_names: |  | ||||||
|             try: |             try: | ||||||
|                 module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None) |                 FRAME_PROCESSORS_MODULES = [proc for proc in FRAME_PROCESSORS_MODULES if proc.__name__.split('.')[-1] != frame_processor] | ||||||
|                 if module_to_remove: |  | ||||||
|                     FRAME_PROCESSORS_MODULES.remove(module_to_remove) |  | ||||||
|                 if frame_processor in modules.globals.frame_processors: |  | ||||||
|                     modules.globals.frame_processors.remove(frame_processor) |  | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                  print(f"Warning: Error removing frame processor {frame_processor}: {e}") |                 print(f"Error removing frame processor '{frame_processor}': {e}") | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: | def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: | ||||||
|  |     """Process frames in parallel using a thread pool.""" | ||||||
|     with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: |     with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: | ||||||
|         futures = [] |         futures = [] | ||||||
|         for path in temp_frame_paths: |         for path in temp_frame_paths: | ||||||
|  | @ -76,7 +76,8 @@ def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_f | ||||||
|             future.result() |             future.result() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: | def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None: | ||||||
|  |     """Process a video by processing all frames with a progress bar.""" | ||||||
|     progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' |     progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' | ||||||
|     total = len(frame_paths) |     total = len(frame_paths) | ||||||
|     with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: |     with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: | ||||||
|  |  | ||||||
|  | @ -1,16 +1,14 @@ | ||||||
| from typing import Any, List | import os | ||||||
| import cv2 | import cv2 | ||||||
| import threading | import threading | ||||||
| import gfpgan | import platform | ||||||
| import os | import torch | ||||||
| 
 | import modules | ||||||
| import modules.globals | import numpy as np | ||||||
| import modules.processors.frame.core | from typing import Any, List | ||||||
| from modules.core import update_status | from modules.core import update_status | ||||||
| from modules.face_analyser import get_one_face | from modules.face_analyser import get_one_face | ||||||
| from modules.typing import Frame, Face | from modules.typing import Frame, Face | ||||||
| import platform |  | ||||||
| import torch |  | ||||||
| from modules.utilities import ( | from modules.utilities import ( | ||||||
|     conditional_download, |     conditional_download, | ||||||
|     is_image, |     is_image, | ||||||
|  | @ -29,6 +27,7 @@ models_dir = os.path.join( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pre_check() -> bool: | def pre_check() -> bool: | ||||||
|  |     """Ensure required model is downloaded.""" | ||||||
|     download_directory_path = models_dir |     download_directory_path = models_dir | ||||||
|     conditional_download( |     conditional_download( | ||||||
|         download_directory_path, |         download_directory_path, | ||||||
|  | @ -40,6 +39,7 @@ def pre_check() -> bool: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pre_start() -> bool: | def pre_start() -> bool: | ||||||
|  |     """Check if target path is valid before starting.""" | ||||||
|     if not is_image(modules.globals.target_path) and not is_video( |     if not is_image(modules.globals.target_path) and not is_video( | ||||||
|         modules.globals.target_path |         modules.globals.target_path | ||||||
|     ): |     ): | ||||||
|  | @ -50,52 +50,54 @@ def pre_start() -> bool: | ||||||
| 
 | 
 | ||||||
| TENSORRT_AVAILABLE = False | TENSORRT_AVAILABLE = False | ||||||
| try: | try: | ||||||
|     import torch_tensorrt |     import tensorrt | ||||||
|     TENSORRT_AVAILABLE = True |     TENSORRT_AVAILABLE = True | ||||||
| except ImportError as im: | except ImportError as im: | ||||||
|     print(f"TensorRT is not available: {im}") |     print(f"TensorRT is not available: {im}") | ||||||
|     pass |  | ||||||
| except Exception as e: | except Exception as e: | ||||||
|     print(f"TensorRT is not available: {e}") |     print(f"TensorRT is not available: {e}") | ||||||
|     pass | 
 | ||||||
| 
 | 
 | ||||||
| def get_face_enhancer() -> Any: | def get_face_enhancer() -> Any: | ||||||
|  |     """Thread-safe singleton loader for the face enhancer model.""" | ||||||
|     global FACE_ENHANCER |     global FACE_ENHANCER | ||||||
| 
 |  | ||||||
|     with THREAD_LOCK: |     with THREAD_LOCK: | ||||||
|         if FACE_ENHANCER is None: |         if FACE_ENHANCER is None: | ||||||
|             model_path = os.path.join(models_dir, "GFPGANv1.4.pth") |             model_path = os.path.join(models_dir, "GFPGANv1.4.pth") | ||||||
|              |             selected_device = "cpu" | ||||||
|             selected_device = None |  | ||||||
|             device_priority = [] |  | ||||||
| 
 |  | ||||||
|             if TENSORRT_AVAILABLE and torch.cuda.is_available(): |             if TENSORRT_AVAILABLE and torch.cuda.is_available(): | ||||||
|                 selected_device = torch.device("cuda") |                 selected_device = "cuda" | ||||||
|                 device_priority.append("TensorRT+CUDA") |  | ||||||
|             elif torch.cuda.is_available(): |             elif torch.cuda.is_available(): | ||||||
|                 selected_device = torch.device("cuda") |                 selected_device = "cuda" | ||||||
|                 device_priority.append("CUDA") |             elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available() and platform.system() == "Darwin": | ||||||
|             elif torch.backends.mps.is_available() and platform.system() == "Darwin": |                 selected_device = "mps" | ||||||
|                 selected_device = torch.device("mps") |             # Import GFPGAN only when needed | ||||||
|                 device_priority.append("MPS") |             try: | ||||||
|             elif not torch.cuda.is_available(): |                 import gfpgan | ||||||
|                 selected_device = torch.device("cpu") |  | ||||||
|                 device_priority.append("CPU") |  | ||||||
| 
 | 
 | ||||||
|                 FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) |                 FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) | ||||||
| 
 |             except Exception as e: | ||||||
|             # for debug: |                 print(f"Failed to load GFPGAN: {e}") | ||||||
|             print(f"Selected device: {selected_device} and device priority: {device_priority}") |                 FACE_ENHANCER = None | ||||||
|     return FACE_ENHANCER |     return FACE_ENHANCER | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def enhance_face(temp_frame: Frame) -> Frame: | def enhance_face(temp_frame: Any) -> Any: | ||||||
|  |     """Enhance a face in the given frame using GFPGAN.""" | ||||||
|     with THREAD_SEMAPHORE: |     with THREAD_SEMAPHORE: | ||||||
|         _, _, temp_frame = get_face_enhancer().enhance(temp_frame, paste_back=True) |         enhancer = get_face_enhancer() | ||||||
|  |         if enhancer is None: | ||||||
|  |             print("Face enhancer model not loaded.") | ||||||
|  |             return temp_frame | ||||||
|  |         try: | ||||||
|  |             _, _, temp_frame = enhancer.enhance(temp_frame, paste_back=True) | ||||||
|  |         except Exception as e: | ||||||
|  |             print(f"Face enhancement failed: {e}") | ||||||
|     return temp_frame |     return temp_frame | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | def process_frame(source_face: Any, temp_frame: Any) -> Any: | ||||||
|  |     """Process a single frame for face enhancement.""" | ||||||
|     target_face = get_one_face(temp_frame) |     target_face = get_one_face(temp_frame) | ||||||
|     if target_face: |     if target_face: | ||||||
|         temp_frame = enhance_face(temp_frame) |         temp_frame = enhance_face(temp_frame) | ||||||
|  | @ -105,25 +107,33 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||||
| def process_frames( | def process_frames( | ||||||
|     source_path: str, temp_frame_paths: List[str], progress: Any = None |     source_path: str, temp_frame_paths: List[str], progress: Any = None | ||||||
| ) -> None: | ) -> None: | ||||||
|  |     """Process a list of frames for face enhancement, updating progress and handling errors.""" | ||||||
|     for temp_frame_path in temp_frame_paths: |     for temp_frame_path in temp_frame_paths: | ||||||
|         temp_frame = cv2.imread(temp_frame_path) |         temp_frame = cv2.imread(temp_frame_path) | ||||||
|  |         try: | ||||||
|             result = process_frame(None, temp_frame) |             result = process_frame(None, temp_frame) | ||||||
|             cv2.imwrite(temp_frame_path, result) |             cv2.imwrite(temp_frame_path, result) | ||||||
|  |         except Exception as e: | ||||||
|  |             print(f"Frame enhancement failed: {e}") | ||||||
|  |         finally: | ||||||
|             if progress: |             if progress: | ||||||
|                 progress.update(1) |                 progress.update(1) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_image(source_path: str, target_path: str, output_path: str) -> None: | def process_image(source_path: str, target_path: str, output_path: str) -> None: | ||||||
|  |     """Process a single image for face enhancement.""" | ||||||
|     target_frame = cv2.imread(target_path) |     target_frame = cv2.imread(target_path) | ||||||
|     result = process_frame(None, target_frame) |     result = process_frame(None, target_frame) | ||||||
|     cv2.imwrite(output_path, result) |     cv2.imwrite(output_path, result) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | ||||||
|  |     """Process a video for face enhancement.""" | ||||||
|     modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) |     modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_frame_v2(temp_frame: Frame) -> Frame: | def process_frame_v2(temp_frame: Any) -> Any: | ||||||
|  |     """Alternative frame processing for face enhancement (for mapped faces, if needed).""" | ||||||
|     target_face = get_one_face(temp_frame) |     target_face = get_one_face(temp_frame) | ||||||
|     if target_face: |     if target_face: | ||||||
|         temp_frame = enhance_face(temp_frame) |         temp_frame = enhance_face(temp_frame) | ||||||
|  |  | ||||||
|  | @ -28,17 +28,19 @@ models_dir = os.path.join( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pre_check() -> bool: | def pre_check() -> bool: | ||||||
|     download_directory_path = abs_dir |     """Ensure required model is downloaded.""" | ||||||
|  |     download_directory_path = models_dir | ||||||
|     conditional_download( |     conditional_download( | ||||||
|         download_directory_path, |         download_directory_path, | ||||||
|         [ |         [ | ||||||
|             "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" |             "https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128_fp16.onnx" | ||||||
|         ], |         ], | ||||||
|     ) |     ) | ||||||
|     return True |     return True | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pre_start() -> bool: | def pre_start() -> bool: | ||||||
|  |     """Check if source and target paths are valid before starting.""" | ||||||
|     if not modules.globals.map_faces and not is_image(modules.globals.source_path): |     if not modules.globals.map_faces and not is_image(modules.globals.source_path): | ||||||
|         update_status("Select an image for source path.", NAME) |         update_status("Select an image for source path.", NAME) | ||||||
|         return False |         return False | ||||||
|  | @ -56,8 +58,8 @@ def pre_start() -> bool: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_face_swapper() -> Any: | def get_face_swapper() -> Any: | ||||||
|  |     """Thread-safe singleton loader for the face swapper model.""" | ||||||
|     global FACE_SWAPPER |     global FACE_SWAPPER | ||||||
| 
 |  | ||||||
|     with THREAD_LOCK: |     with THREAD_LOCK: | ||||||
|         if FACE_SWAPPER is None: |         if FACE_SWAPPER is None: | ||||||
|             model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") |             model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") | ||||||
|  | @ -67,41 +69,44 @@ def get_face_swapper() -> Any: | ||||||
|     return FACE_SWAPPER |     return FACE_SWAPPER | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | def swap_face(source_face: Any, target_face: Any, temp_frame: Any) -> Any: | ||||||
|  |     """Swap source_face onto target_face in temp_frame, with improved Poisson blending and optional mouth region blending.""" | ||||||
|  |     face_swapper = get_face_swapper() | ||||||
|  |     try: | ||||||
|         face_swapper = get_face_swapper() |         face_swapper = get_face_swapper() | ||||||
| 
 |  | ||||||
|     # Apply the face swap |  | ||||||
|         swapped_frame = face_swapper.get( |         swapped_frame = face_swapper.get( | ||||||
|             temp_frame, target_face, source_face, paste_back=True |             temp_frame, target_face, source_face, paste_back=True | ||||||
|         ) |         ) | ||||||
| 
 |  | ||||||
|     if modules.globals.mouth_mask: |  | ||||||
|         # Create a mask for the target face |  | ||||||
|         face_mask = create_face_mask(target_face, temp_frame) |  | ||||||
| 
 |  | ||||||
|         # Create the mouth mask |  | ||||||
|         mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( |  | ||||||
|             create_lower_mouth_mask(target_face, temp_frame) |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         # Apply the mouth area |  | ||||||
|         swapped_frame = apply_mouth_area( |  | ||||||
|             swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
|         if modules.globals.show_mouth_mask_box: |  | ||||||
|             mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) |  | ||||||
|             swapped_frame = draw_mouth_mask_visualization( |  | ||||||
|                 swapped_frame, target_face, mouth_mask_data |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|     return swapped_frame |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def process_frame(source_face: Face, temp_frame: Frame) -> Frame: |  | ||||||
|         if modules.globals.color_correction: |         if modules.globals.color_correction: | ||||||
|         temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) |             mask = create_face_mask(target_face, temp_frame) | ||||||
|  |             # Find the center of the mask for seamlessClone | ||||||
|  |             y_indices, x_indices = np.where(mask > 0) | ||||||
|  |             if len(x_indices) > 0 and len(y_indices) > 0: | ||||||
|  |                 center_x = int(np.mean(x_indices)) | ||||||
|  |                 center_y = int(np.mean(y_indices)) | ||||||
|  |                 center = (center_x, center_y) | ||||||
|  |                 # Use seamlessClone for Poisson blending | ||||||
|  |                 swapped_frame = cv2.seamlessClone( | ||||||
|  |                     swapped_frame, temp_frame, mask, center, cv2.NORMAL_CLONE | ||||||
|  |                 ) | ||||||
|  |         # --- Mouth region blending (optional, after Poisson blending) --- | ||||||
|  |         if hasattr(modules.globals, "mouth_mask") and modules.globals.mouth_mask: | ||||||
|  |             # Extract mouth region from the original frame | ||||||
|  |             mouth_mask_data = create_lower_mouth_mask(target_face, temp_frame) | ||||||
|  |             if mouth_mask_data is not None: | ||||||
|  |                 mask, mouth_cutout, mouth_box, mouth_polygon = mouth_mask_data | ||||||
|  |                 face_mask = create_face_mask(target_face, temp_frame) | ||||||
|  |                 swapped_frame = apply_mouth_area( | ||||||
|  |                     swapped_frame, mouth_cutout, mouth_box, face_mask, mouth_polygon | ||||||
|  |                 ) | ||||||
|  |         return swapped_frame | ||||||
|  |     except Exception as e: | ||||||
|  |         logging.error(f"Face swap failed: {e}") | ||||||
|  |         return temp_frame | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | def process_frame(source_face: Any, temp_frame: Any) -> Any: | ||||||
|  |     """Process a single frame for face swapping.""" | ||||||
|     if modules.globals.many_faces: |     if modules.globals.many_faces: | ||||||
|         many_faces = get_many_faces(temp_frame) |         many_faces = get_many_faces(temp_frame) | ||||||
|         if many_faces: |         if many_faces: | ||||||
|  | @ -109,7 +114,7 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||||
|                 if source_face and target_face: |                 if source_face and target_face: | ||||||
|                     temp_frame = swap_face(source_face, target_face, temp_frame) |                     temp_frame = swap_face(source_face, target_face, temp_frame) | ||||||
|                 else: |                 else: | ||||||
|                     print("Face detection failed for target/source.") |                     logging.warning("Face detection failed for target/source.") | ||||||
|     else: |     else: | ||||||
|         target_face = get_one_face(temp_frame) |         target_face = get_one_face(temp_frame) | ||||||
|         if target_face and source_face: |         if target_face and source_face: | ||||||
|  | @ -119,8 +124,8 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||||
|     return temp_frame |     return temp_frame | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 | def process_frame_v2(temp_frame: Any, temp_frame_path: str = "") -> Any: | ||||||
| def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: |     """Process a frame using mapped faces (for mapped face mode).""" | ||||||
|     if is_image(modules.globals.target_path): |     if is_image(modules.globals.target_path): | ||||||
|         if modules.globals.many_faces: |         if modules.globals.many_faces: | ||||||
|             source_face = default_source_face() |             source_face = default_source_face() | ||||||
|  | @ -213,16 +218,26 @@ def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: | ||||||
| def process_frames( | def process_frames( | ||||||
|     source_path: str, temp_frame_paths: List[str], progress: Any = None |     source_path: str, temp_frame_paths: List[str], progress: Any = None | ||||||
| ) -> None: | ) -> None: | ||||||
|  |     """Process a list of frames for face swapping, updating progress and handling errors.""" | ||||||
|     if not modules.globals.map_faces: |     if not modules.globals.map_faces: | ||||||
|         source_face = get_one_face(cv2.imread(source_path)) |         source_face = get_one_face(cv2.imread(source_path)) | ||||||
|  |         if source_face is None: | ||||||
|  |             logging.warning("No face detected in source image. Skipping all frames.") | ||||||
|  |             if progress: | ||||||
|  |                 for _ in temp_frame_paths: | ||||||
|  |                     progress.update(1) | ||||||
|  |             return | ||||||
|         for temp_frame_path in temp_frame_paths: |         for temp_frame_path in temp_frame_paths: | ||||||
|             temp_frame = cv2.imread(temp_frame_path) |             temp_frame = cv2.imread(temp_frame_path) | ||||||
|             try: |             try: | ||||||
|                 result = process_frame(source_face, temp_frame) |                 result = process_frame(source_face, temp_frame) | ||||||
|  |                 if np.array_equal(result, temp_frame): | ||||||
|  |                     logging.warning(f"No face detected in target frame: {temp_frame_path}. Skipping write.") | ||||||
|  |                 else: | ||||||
|                     cv2.imwrite(temp_frame_path, result) |                     cv2.imwrite(temp_frame_path, result) | ||||||
|             except Exception as exception: |             except Exception as exception: | ||||||
|                 print(exception) |                 logging.error(f"Frame processing failed: {exception}") | ||||||
|                 pass |             finally: | ||||||
|                 if progress: |                 if progress: | ||||||
|                     progress.update(1) |                     progress.update(1) | ||||||
|     else: |     else: | ||||||
|  | @ -230,28 +245,43 @@ def process_frames( | ||||||
|             temp_frame = cv2.imread(temp_frame_path) |             temp_frame = cv2.imread(temp_frame_path) | ||||||
|             try: |             try: | ||||||
|                 result = process_frame_v2(temp_frame, temp_frame_path) |                 result = process_frame_v2(temp_frame, temp_frame_path) | ||||||
|  |                 if np.array_equal(result, temp_frame): | ||||||
|  |                     logging.warning(f"No face detected in mapped target frame: {temp_frame_path}. Skipping write.") | ||||||
|  |                 else: | ||||||
|                     cv2.imwrite(temp_frame_path, result) |                     cv2.imwrite(temp_frame_path, result) | ||||||
|             except Exception as exception: |             except Exception as exception: | ||||||
|                 print(exception) |                 logging.error(f"Frame processing failed: {exception}") | ||||||
|                 pass |             finally: | ||||||
|                 if progress: |                 if progress: | ||||||
|                     progress.update(1) |                     progress.update(1) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_image(source_path: str, target_path: str, output_path: str) -> None: | def process_image(source_path: str, target_path: str, output_path: str) -> bool: | ||||||
|  |     """Process a single image and return True if successful, False if no face detected.""" | ||||||
|     if not modules.globals.map_faces: |     if not modules.globals.map_faces: | ||||||
|         source_face = get_one_face(cv2.imread(source_path)) |         source_face = get_one_face(cv2.imread(source_path)) | ||||||
|  |         if source_face is None: | ||||||
|  |             logging.warning("No face detected in source image. Skipping output.") | ||||||
|  |             return False | ||||||
|         target_frame = cv2.imread(target_path) |         target_frame = cv2.imread(target_path) | ||||||
|         result = process_frame(source_face, target_frame) |         result = process_frame(source_face, target_frame) | ||||||
|  |         if np.array_equal(result, target_frame): | ||||||
|  |             logging.warning("No face detected in target image. Skipping output.") | ||||||
|  |             return False | ||||||
|         cv2.imwrite(output_path, result) |         cv2.imwrite(output_path, result) | ||||||
|  |         return True | ||||||
|     else: |     else: | ||||||
|         if modules.globals.many_faces: |         if modules.globals.many_faces: | ||||||
|             update_status( |             update_status( | ||||||
|                 "Many faces enabled. Using first source image. Progressing...", NAME |                 "Many faces enabled. Using first source image. Progressing...", NAME | ||||||
|             ) |             ) | ||||||
|         target_frame = cv2.imread(output_path) |         target_frame = cv2.imread(target_path) | ||||||
|         result = process_frame_v2(target_frame) |         result = process_frame_v2(target_frame) | ||||||
|  |         if np.array_equal(result, target_frame): | ||||||
|  |             logging.warning("No face detected in mapped target image. Skipping output.") | ||||||
|  |             return False | ||||||
|         cv2.imwrite(output_path, result) |         cv2.imwrite(output_path, result) | ||||||
|  |         return True | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | ||||||
|  | @ -264,9 +294,21 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None: | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_lower_mouth_mask( | def color_transfer(source: np.ndarray, target: np.ndarray) -> np.ndarray: | ||||||
|     face: Face, frame: Frame |     source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") | ||||||
| ) -> (np.ndarray, np.ndarray, tuple, np.ndarray): |     target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") | ||||||
|  |     s_mean, s_std = cv2.meanStdDev(source_lab) | ||||||
|  |     t_mean, t_std = cv2.meanStdDev(target_lab) | ||||||
|  |     s_mean = s_mean.reshape(1, 1, 3) | ||||||
|  |     s_std = s_std.reshape(1, 1, 3) | ||||||
|  |     t_mean = t_mean.reshape(1, 1, 3) | ||||||
|  |     t_std = t_std.reshape(1, 1, 3) | ||||||
|  |     result = (source_lab - s_mean) * (t_std / (s_std + 1e-6)) + t_mean | ||||||
|  |     result = np.clip(result, 0, 255).astype("uint8") | ||||||
|  |     return cv2.cvtColor(result, cv2.COLOR_LAB2BGR) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def create_lower_mouth_mask(face, frame: np.ndarray): | ||||||
|     mask = np.zeros(frame.shape[:2], dtype=np.uint8) |     mask = np.zeros(frame.shape[:2], dtype=np.uint8) | ||||||
|     mouth_cutout = None |     mouth_cutout = None | ||||||
|     landmarks = face.landmark_2d_106 |     landmarks = face.landmark_2d_106 | ||||||
|  | @ -381,9 +423,7 @@ def create_lower_mouth_mask( | ||||||
|     return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon |     return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def draw_mouth_mask_visualization( | def draw_mouth_mask_visualization(frame: np.ndarray, face, mouth_mask_data: tuple) -> np.ndarray: | ||||||
|     frame: Frame, face: Face, mouth_mask_data: tuple |  | ||||||
| ) -> Frame: |  | ||||||
|     landmarks = face.landmark_2d_106 |     landmarks = face.landmark_2d_106 | ||||||
|     if landmarks is not None and mouth_mask_data is not None: |     if landmarks is not None and mouth_mask_data is not None: | ||||||
|         mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( |         mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( | ||||||
|  | @ -492,7 +532,7 @@ def apply_mouth_area( | ||||||
|                 resized_mouth_cutout, (roi.shape[1], roi.shape[0]) |                 resized_mouth_cutout, (roi.shape[1], roi.shape[0]) | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) |         color_corrected_mouth = color_transfer(resized_mouth_cutout, roi) | ||||||
| 
 | 
 | ||||||
|         # Use the provided mouth polygon to create the mask |         # Use the provided mouth polygon to create the mask | ||||||
|         polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) |         polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) | ||||||
|  | @ -531,7 +571,7 @@ def apply_mouth_area( | ||||||
|     return frame |     return frame | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | def create_face_mask(face, frame: np.ndarray) -> np.ndarray: | ||||||
|     mask = np.zeros(frame.shape[:2], dtype=np.uint8) |     mask = np.zeros(frame.shape[:2], dtype=np.uint8) | ||||||
|     landmarks = face.landmark_2d_106 |     landmarks = face.landmark_2d_106 | ||||||
|     if landmarks is not None: |     if landmarks is not None: | ||||||
|  | @ -598,25 +638,3 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | ||||||
|         mask = cv2.GaussianBlur(mask, (5, 5), 3) |         mask = cv2.GaussianBlur(mask, (5, 5), 3) | ||||||
| 
 | 
 | ||||||
|     return mask |     return mask | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def apply_color_transfer(source, target): |  | ||||||
|     """ |  | ||||||
|     Apply color transfer from target to source image |  | ||||||
|     """ |  | ||||||
|     source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") |  | ||||||
|     target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") |  | ||||||
| 
 |  | ||||||
|     source_mean, source_std = cv2.meanStdDev(source) |  | ||||||
|     target_mean, target_std = cv2.meanStdDev(target) |  | ||||||
| 
 |  | ||||||
|     # Reshape mean and std to be broadcastable |  | ||||||
|     source_mean = source_mean.reshape(1, 1, 3) |  | ||||||
|     source_std = source_std.reshape(1, 1, 3) |  | ||||||
|     target_mean = target_mean.reshape(1, 1, 3) |  | ||||||
|     target_std = target_std.reshape(1, 1, 3) |  | ||||||
| 
 |  | ||||||
|     # Perform the color transfer |  | ||||||
|     source = (source - source_mean) * (target_std / source_std) + target_mean |  | ||||||
| 
 |  | ||||||
|     return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) |  | ||||||
|  |  | ||||||
|  | @ -1,7 +1,9 @@ | ||||||
| from typing import Any | from typing import Any | ||||||
| 
 | 
 | ||||||
| from insightface.app.common import Face | from insightface.app.common import Face as InsightFace | ||||||
| import numpy | import numpy | ||||||
| 
 | 
 | ||||||
| Face = Face | # Alias for a detected face object from insightface | ||||||
| Frame = numpy.ndarray[Any, Any] | Face = InsightFace | ||||||
|  | # Alias for a numpy ndarray representing an image frame | ||||||
|  | Frame = numpy.ndarray | ||||||
|  |  | ||||||
								
									
									
										
											174
										
									
									modules/ui.py
									
									
									
									
								
								
							
							
										
											174
										
									
									modules/ui.py
									
									
									
									
								|  | @ -28,6 +28,12 @@ from modules.utilities import ( | ||||||
| from modules.video_capture import VideoCapturer | from modules.video_capture import VideoCapturer | ||||||
| from modules.gettext import LanguageManager | from modules.gettext import LanguageManager | ||||||
| import platform | import platform | ||||||
|  | try: | ||||||
|  |     import pyvirtualcam | ||||||
|  |     PYVIRTUALCAM_AVAILABLE = True | ||||||
|  | except ImportError: | ||||||
|  |     PYVIRTUALCAM_AVAILABLE = False | ||||||
|  |     print("pyvirtualcam is not installed. Virtual camera support will be disabled.") | ||||||
| 
 | 
 | ||||||
| if platform.system() == "Windows": | if platform.system() == "Windows": | ||||||
|     from pygrabber.dshow_graph import FilterGraph |     from pygrabber.dshow_graph import FilterGraph | ||||||
|  | @ -363,7 +369,17 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C | ||||||
|         ), |         ), | ||||||
|     ) |     ) | ||||||
|     live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) |     live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) | ||||||
|     # --- End Camera Selection --- | 
 | ||||||
|  |     # --- Virtual Camera Toggle --- | ||||||
|  |     virtual_cam_button = ctk.CTkButton( | ||||||
|  |         root, | ||||||
|  |         text=_("Toggle Virtual Cam"), | ||||||
|  |         cursor="hand2", | ||||||
|  |         command=toggle_virtual_cam, | ||||||
|  |         state=("normal" if PYVIRTUALCAM_AVAILABLE else "disabled"), | ||||||
|  |     ) | ||||||
|  |     virtual_cam_button.place(relx=0.1, rely=0.92, relwidth=0.35, relheight=0.05) | ||||||
|  |     # --- End Virtual Camera Toggle --- | ||||||
| 
 | 
 | ||||||
|     status_label = ctk.CTkLabel(root, text=None, justify="center") |     status_label = ctk.CTkLabel(root, text=None, justify="center") | ||||||
|     status_label.place(relx=0.1, rely=0.9, relwidth=0.8) |     status_label.place(relx=0.1, rely=0.9, relwidth=0.8) | ||||||
|  | @ -797,75 +813,61 @@ def webcam_preview(root: ctk.CTk, camera_index: int): | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | virtual_cam_manager = VirtualCamManager() | ||||||
|  | virtual_cam_enabled = False  # Use a global variable for clarity | ||||||
| 
 | 
 | ||||||
| def get_available_cameras(): | def toggle_virtual_cam(): | ||||||
|     """Returns a list of available camera names and indices.""" |     global virtual_cam_enabled | ||||||
|     if platform.system() == "Windows": |     if not PYVIRTUALCAM_AVAILABLE: | ||||||
|  |         update_status("pyvirtualcam not installed. Cannot enable virtual camera.") | ||||||
|  |         return | ||||||
|  |     if not virtual_cam_enabled: | ||||||
|  |         virtual_cam_manager.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 30) | ||||||
|  |         virtual_cam_enabled = True | ||||||
|  |         update_status("Virtual camera enabled.") | ||||||
|  |     else: | ||||||
|  |         virtual_cam_manager.stop() | ||||||
|  |         virtual_cam_enabled = False | ||||||
|  |         update_status("Virtual camera disabled.") | ||||||
|  | 
 | ||||||
|  | class VirtualCamManager: | ||||||
|  |     """Manages the virtual camera output using pyvirtualcam.""" | ||||||
|  |     def __init__(self): | ||||||
|  |         self.cam = None | ||||||
|  |         self.enabled = False | ||||||
|  |         self.width = PREVIEW_DEFAULT_WIDTH | ||||||
|  |         self.height = PREVIEW_DEFAULT_HEIGHT | ||||||
|  |         self.fps = 30 | ||||||
|  | 
 | ||||||
|  |     def start(self, width: int, height: int, fps: int = 30): | ||||||
|  |         if self.cam is None: | ||||||
|             try: |             try: | ||||||
|             graph = FilterGraph() |                 self.cam = pyvirtualcam.Camera(width=width, height=height, fps=fps, print_fps=False) | ||||||
|             devices = graph.get_input_devices() |                 self.enabled = True | ||||||
| 
 |                 print("Virtual camera started.") | ||||||
|             # Create list of indices and names |  | ||||||
|             camera_indices = list(range(len(devices))) |  | ||||||
|             camera_names = devices |  | ||||||
| 
 |  | ||||||
|             # If no cameras found through DirectShow, try OpenCV fallback |  | ||||||
|             if not camera_names: |  | ||||||
|                 # Try to open camera with index -1 and 0 |  | ||||||
|                 test_indices = [-1, 0] |  | ||||||
|                 working_cameras = [] |  | ||||||
| 
 |  | ||||||
|                 for idx in test_indices: |  | ||||||
|                     cap = cv2.VideoCapture(idx) |  | ||||||
|                     if cap.isOpened(): |  | ||||||
|                         working_cameras.append(f"Camera {idx}") |  | ||||||
|                         cap.release() |  | ||||||
| 
 |  | ||||||
|                 if working_cameras: |  | ||||||
|                     return test_indices[: len(working_cameras)], working_cameras |  | ||||||
| 
 |  | ||||||
|             # If still no cameras found, return empty lists |  | ||||||
|             if not camera_names: |  | ||||||
|                 return [], ["No cameras found"] |  | ||||||
| 
 |  | ||||||
|             return camera_indices, camera_names |  | ||||||
| 
 |  | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|             print(f"Error detecting cameras: {str(e)}") |                 print(f"Failed to start virtual camera: {e}") | ||||||
|             return [], ["No cameras found"] |                 self.cam = None | ||||||
|     else: |                 self.enabled = False | ||||||
|         # Unix-like systems (Linux/Mac) camera detection |  | ||||||
|         camera_indices = [] |  | ||||||
|         camera_names = [] |  | ||||||
| 
 | 
 | ||||||
|         if platform.system() == "Darwin":  # macOS specific handling |     def send(self, frame): | ||||||
|             # Try to open the default FaceTime camera first |         if self.cam and self.enabled: | ||||||
|             cap = cv2.VideoCapture(0) |             try: | ||||||
|             if cap.isOpened(): |                 # pyvirtualcam expects RGB | ||||||
|                 camera_indices.append(0) |                 if frame.shape[2] == 3: | ||||||
|                 camera_names.append("FaceTime Camera") |                     self.cam.send(frame) | ||||||
|                 cap.release() |                     self.cam.sleep_until_next_frame() | ||||||
|  |             except Exception as e: | ||||||
|  |                 print(f"Error sending frame to virtual camera: {e}") | ||||||
| 
 | 
 | ||||||
|             # On macOS, additional cameras typically use indices 1 and 2 |     def stop(self): | ||||||
|             for i in [1, 2]: |         if self.cam: | ||||||
|                 cap = cv2.VideoCapture(i) |             try: | ||||||
|                 if cap.isOpened(): |                 self.cam.close() | ||||||
|                     camera_indices.append(i) |             except Exception as e: | ||||||
|                     camera_names.append(f"Camera {i}") |                 print(f"Error closing virtual camera: {e}") | ||||||
|                     cap.release() |             self.cam = None | ||||||
|         else: |             self.enabled = False | ||||||
|             # Linux camera detection - test first 10 indices |  | ||||||
|             for i in range(10): |  | ||||||
|                 cap = cv2.VideoCapture(i) |  | ||||||
|                 if cap.isOpened(): |  | ||||||
|                     camera_indices.append(i) |  | ||||||
|                     camera_names.append(f"Camera {i}") |  | ||||||
|                     cap.release() |  | ||||||
| 
 |  | ||||||
|         if not camera_names: |  | ||||||
|             return [], ["No cameras found"] |  | ||||||
| 
 |  | ||||||
|         return camera_indices, camera_names |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_webcam_preview(camera_index: int): | def create_webcam_preview(camera_index: int): | ||||||
|  | @ -885,10 +887,23 @@ def create_webcam_preview(camera_index: int): | ||||||
|     fps_update_interval = 0.5 |     fps_update_interval = 0.5 | ||||||
|     frame_count = 0 |     frame_count = 0 | ||||||
|     fps = 0 |     fps = 0 | ||||||
|  |     face_swap_enabled = True  # Toggle for live face swap | ||||||
|  |     last_face_detected = True | ||||||
|  |     no_face_counter = 0 | ||||||
|  |     NO_FACE_THRESHOLD = 30  # Number of frames to show warning if no face | ||||||
|  | 
 | ||||||
|  |     def toggle_face_swap(): | ||||||
|  |         nonlocal face_swap_enabled | ||||||
|  |         face_swap_enabled = not face_swap_enabled | ||||||
|  |         update_status(f"Face Swap {'Enabled' if face_swap_enabled else 'Disabled'}") | ||||||
|  | 
 | ||||||
|  |     # Optionally, bind a key or button to toggle_face_swap | ||||||
|  |     PREVIEW.bind('<f>', lambda e: toggle_face_swap()) | ||||||
| 
 | 
 | ||||||
|     while True: |     while True: | ||||||
|         ret, frame = cap.read() |         ret, frame = cap.read() | ||||||
|         if not ret: |         if not ret: | ||||||
|  |             update_status("Camera frame read failed.") | ||||||
|             break |             break | ||||||
| 
 | 
 | ||||||
|         temp_frame = frame.copy() |         temp_frame = frame.copy() | ||||||
|  | @ -900,12 +915,13 @@ def create_webcam_preview(camera_index: int): | ||||||
|             temp_frame = fit_image_to_size( |             temp_frame = fit_image_to_size( | ||||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() |                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||||
|             ) |             ) | ||||||
| 
 |  | ||||||
|         else: |         else: | ||||||
|             temp_frame = fit_image_to_size( |             temp_frame = fit_image_to_size( | ||||||
|                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() |                 temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|  |         face_found = True | ||||||
|  |         if face_swap_enabled: | ||||||
|             if not modules.globals.map_faces: |             if not modules.globals.map_faces: | ||||||
|                 if source_image is None and modules.globals.source_path: |                 if source_image is None and modules.globals.source_path: | ||||||
|                     source_image = get_one_face(cv2.imread(modules.globals.source_path)) |                     source_image = get_one_face(cv2.imread(modules.globals.source_path)) | ||||||
|  | @ -915,7 +931,15 @@ def create_webcam_preview(camera_index: int): | ||||||
|                         if modules.globals.fp_ui["face_enhancer"]: |                         if modules.globals.fp_ui["face_enhancer"]: | ||||||
|                             temp_frame = frame_processor.process_frame(None, temp_frame) |                             temp_frame = frame_processor.process_frame(None, temp_frame) | ||||||
|                     else: |                     else: | ||||||
|  |                         # Check if a face is detected before swapping | ||||||
|  |                         detected_face = get_one_face(temp_frame) | ||||||
|  |                         if detected_face is not None and source_image is not None: | ||||||
|                             temp_frame = frame_processor.process_frame(source_image, temp_frame) |                             temp_frame = frame_processor.process_frame(source_image, temp_frame) | ||||||
|  |                             last_face_detected = True | ||||||
|  |                             no_face_counter = 0 | ||||||
|  |                         else: | ||||||
|  |                             face_found = False | ||||||
|  |                             no_face_counter += 1 | ||||||
|             else: |             else: | ||||||
|                 modules.globals.target_path = None |                 modules.globals.target_path = None | ||||||
|                 for frame_processor in frame_processors: |                 for frame_processor in frame_processors: | ||||||
|  | @ -924,6 +948,23 @@ def create_webcam_preview(camera_index: int): | ||||||
|                             temp_frame = frame_processor.process_frame_v2(temp_frame) |                             temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||||
|                     else: |                     else: | ||||||
|                         temp_frame = frame_processor.process_frame_v2(temp_frame) |                         temp_frame = frame_processor.process_frame_v2(temp_frame) | ||||||
|  |         else: | ||||||
|  |             # Face swap disabled, just show the frame | ||||||
|  |             pass | ||||||
|  | 
 | ||||||
|  |         # Show warning if no face detected for a while | ||||||
|  |         if not face_found and no_face_counter > NO_FACE_THRESHOLD: | ||||||
|  |             cv2.putText( | ||||||
|  |                 temp_frame, | ||||||
|  |                 "No face detected!", | ||||||
|  |                 (10, 60), | ||||||
|  |                 cv2.FONT_HERSHEY_SIMPLEX, | ||||||
|  |                 1.2, | ||||||
|  |                 (0, 0, 255), | ||||||
|  |                 3, | ||||||
|  |             ) | ||||||
|  |         elif face_found: | ||||||
|  |             no_face_counter = 0 | ||||||
| 
 | 
 | ||||||
|         # Calculate and display FPS |         # Calculate and display FPS | ||||||
|         current_time = time.time() |         current_time = time.time() | ||||||
|  | @ -958,6 +999,7 @@ def create_webcam_preview(camera_index: int): | ||||||
| 
 | 
 | ||||||
|     cap.release() |     cap.release() | ||||||
|     PREVIEW.withdraw() |     PREVIEW.withdraw() | ||||||
|  |     update_status("Webcam preview closed.") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_source_target_popup_for_webcam( | def create_source_target_popup_for_webcam( | ||||||
|  |  | ||||||
|  | @ -2,10 +2,10 @@ import glob | ||||||
| import mimetypes | import mimetypes | ||||||
| import os | import os | ||||||
| import platform | import platform | ||||||
| import shutil |  | ||||||
| import ssl | import ssl | ||||||
| import subprocess | import subprocess | ||||||
| import urllib | import cv2 | ||||||
|  | import modules | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from typing import List, Any | from typing import List, Any | ||||||
| from tqdm import tqdm | from tqdm import tqdm | ||||||
|  | @ -21,6 +21,7 @@ if platform.system().lower() == "darwin": | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def run_ffmpeg(args: List[str]) -> bool: | def run_ffmpeg(args: List[str]) -> bool: | ||||||
|  |     """Run an ffmpeg command with the given arguments.""" | ||||||
|     commands = [ |     commands = [ | ||||||
|         "ffmpeg", |         "ffmpeg", | ||||||
|         "-hide_banner", |         "-hide_banner", | ||||||
|  | @ -31,14 +32,15 @@ def run_ffmpeg(args: List[str]) -> bool: | ||||||
|     ] |     ] | ||||||
|     commands.extend(args) |     commands.extend(args) | ||||||
|     try: |     try: | ||||||
|         subprocess.check_output(commands, stderr=subprocess.STDOUT) |         subprocess.run(commands, check=True) | ||||||
|         return True |         return True | ||||||
|     except Exception: |     except Exception as e: | ||||||
|         pass |         print(f"Error running ffmpeg: {e}") | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def detect_fps(target_path: str) -> float: | def detect_fps(target_path: str) -> float: | ||||||
|  |     """Detect the FPS of a video file using ffprobe.""" | ||||||
|     command = [ |     command = [ | ||||||
|         "ffprobe", |         "ffprobe", | ||||||
|         "-v", |         "-v", | ||||||
|  | @ -51,16 +53,18 @@ def detect_fps(target_path: str) -> float: | ||||||
|         "default=noprint_wrappers=1:nokey=1", |         "default=noprint_wrappers=1:nokey=1", | ||||||
|         target_path, |         target_path, | ||||||
|     ] |     ] | ||||||
|     output = subprocess.check_output(command).decode().strip().split("/") |  | ||||||
|     try: |     try: | ||||||
|         numerator, denominator = map(int, output) |         output = subprocess.check_output(command).decode().strip().split("/") | ||||||
|         return numerator / denominator |         if len(output) == 2: | ||||||
|     except Exception: |             return float(output[0]) / float(output[1]) | ||||||
|         pass |         return float(output[0]) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error detecting FPS: {e}") | ||||||
|         return 30.0 |         return 30.0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def extract_frames(target_path: str) -> None: | def extract_frames(target_path: str) -> None: | ||||||
|  |     """Extract frames from a video file to a temp directory.""" | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     temp_directory_path = get_temp_directory_path(target_path) | ||||||
|     run_ffmpeg( |     run_ffmpeg( | ||||||
|         [ |         [ | ||||||
|  | @ -74,6 +78,7 @@ def extract_frames(target_path: str) -> None: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_video(target_path: str, fps: float = 30.0) -> None: | def create_video(target_path: str, fps: float = 30.0) -> None: | ||||||
|  |     """Create a video from frames in the temp directory.""" | ||||||
|     temp_output_path = get_temp_output_path(target_path) |     temp_output_path = get_temp_output_path(target_path) | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     temp_directory_path = get_temp_directory_path(target_path) | ||||||
|     run_ffmpeg( |     run_ffmpeg( | ||||||
|  | @ -97,6 +102,7 @@ def create_video(target_path: str, fps: float = 30.0) -> None: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def restore_audio(target_path: str, output_path: str) -> None: | def restore_audio(target_path: str, output_path: str) -> None: | ||||||
|  |     """Restore audio from the original video to the output video.""" | ||||||
|     temp_output_path = get_temp_output_path(target_path) |     temp_output_path = get_temp_output_path(target_path) | ||||||
|     done = run_ffmpeg( |     done = run_ffmpeg( | ||||||
|         [ |         [ | ||||||
|  | @ -115,95 +121,107 @@ def restore_audio(target_path: str, output_path: str) -> None: | ||||||
|         ] |         ] | ||||||
|     ) |     ) | ||||||
|     if not done: |     if not done: | ||||||
|         move_temp(target_path, output_path) |         print(f"Failed to restore audio for {output_path}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_temp_frame_paths(target_path: str) -> List[str]: | def get_temp_frame_paths(target_path: str) -> List[str]: | ||||||
|  |     """Get all temp frame file paths for a given target path.""" | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     temp_directory_path = get_temp_directory_path(target_path) | ||||||
|     return glob.glob((os.path.join(glob.escape(temp_directory_path), "*.png"))) |     try: | ||||||
|  |         return sorted([ | ||||||
|  |             str(p) for p in Path(temp_directory_path).glob("*.png") | ||||||
|  |         ]) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error getting temp frame paths: {e}") | ||||||
|  |         return [] | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_temp_directory_path(target_path: str) -> str: | def get_temp_directory_path(target_path: str) -> str: | ||||||
|     target_name, _ = os.path.splitext(os.path.basename(target_path)) |     """Get the temp directory path for a given target path.""" | ||||||
|     target_directory_path = os.path.dirname(target_path) |     base = os.path.splitext(os.path.basename(target_path))[0] | ||||||
|     return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name) |     temp_dir = os.path.join(TEMP_DIRECTORY, base) | ||||||
|  |     os.makedirs(temp_dir, exist_ok=True) | ||||||
|  |     return temp_dir | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_temp_output_path(target_path: str) -> str: | def get_temp_output_path(target_path: str) -> str: | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     """Get the temp output video path for a given target path.""" | ||||||
|     return os.path.join(temp_directory_path, TEMP_FILE) |     base = os.path.splitext(os.path.basename(target_path))[0] | ||||||
|  |     return os.path.join(TEMP_DIRECTORY, f"{base}_out.mp4") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: | def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any: | ||||||
|     if source_path and target_path: |     """Normalize the output path for saving results.""" | ||||||
|         source_name, _ = os.path.splitext(os.path.basename(source_path)) |     if not output_path: | ||||||
|         target_name, target_extension = os.path.splitext(os.path.basename(target_path)) |         base = os.path.splitext(os.path.basename(target_path))[0] | ||||||
|         if os.path.isdir(output_path): |         return os.path.join(TEMP_DIRECTORY, f"{base}_result.png") | ||||||
|             return os.path.join( |  | ||||||
|                 output_path, source_name + "-" + target_name + target_extension |  | ||||||
|             ) |  | ||||||
|     return output_path |     return output_path | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def create_temp(target_path: str) -> None: | def create_temp(target_path: str) -> None: | ||||||
|  |     """Create a temp directory for a given target path.""" | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     temp_directory_path = get_temp_directory_path(target_path) | ||||||
|     Path(temp_directory_path).mkdir(parents=True, exist_ok=True) |     os.makedirs(temp_directory_path, exist_ok=True) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def move_temp(target_path: str, output_path: str) -> None: | def move_temp(target_path: str, output_path: str) -> None: | ||||||
|  |     """Move temp output to the final output path.""" | ||||||
|     temp_output_path = get_temp_output_path(target_path) |     temp_output_path = get_temp_output_path(target_path) | ||||||
|     if os.path.isfile(temp_output_path): |     try: | ||||||
|         if os.path.isfile(output_path): |         os.rename(temp_output_path, output_path) | ||||||
|             os.remove(output_path) |     except Exception as e: | ||||||
|         shutil.move(temp_output_path, output_path) |         print(f"Error moving temp output: {e}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def clean_temp(target_path: str) -> None: | def clean_temp(target_path: str) -> None: | ||||||
|  |     """Remove temp directory and files for a given target path.""" | ||||||
|     temp_directory_path = get_temp_directory_path(target_path) |     temp_directory_path = get_temp_directory_path(target_path) | ||||||
|     parent_directory_path = os.path.dirname(temp_directory_path) |     try: | ||||||
|     if not modules.globals.keep_frames and os.path.isdir(temp_directory_path): |         for p in Path(temp_directory_path).glob("*"): | ||||||
|         shutil.rmtree(temp_directory_path) |             p.unlink() | ||||||
|     if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path): |         os.rmdir(temp_directory_path) | ||||||
|         os.rmdir(parent_directory_path) |     except Exception as e: | ||||||
|  |         print(f"Error cleaning temp directory: {e}") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def has_image_extension(image_path: str) -> bool: | def has_image_extension(image_path: str) -> bool: | ||||||
|     return image_path.lower().endswith(("png", "jpg", "jpeg")) |     """Check if a file has an image extension.""" | ||||||
|  |     return os.path.splitext(image_path)[1].lower() in [ | ||||||
|  |         ".png", ".jpg", ".jpeg", ".gif", ".bmp" | ||||||
|  |     ] | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def is_image(image_path: str) -> bool: | def is_image(image_path: str) -> bool: | ||||||
|     if image_path and os.path.isfile(image_path): |     """Check if a file is an image.""" | ||||||
|         mimetype, _ = mimetypes.guess_type(image_path) |     return has_image_extension(image_path) | ||||||
|         return bool(mimetype and mimetype.startswith("image/")) |  | ||||||
|     return False |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def is_video(video_path: str) -> bool: | def is_video(video_path: str) -> bool: | ||||||
|     if video_path and os.path.isfile(video_path): |     """Check if a file is a video.""" | ||||||
|         mimetype, _ = mimetypes.guess_type(video_path) |     return os.path.splitext(video_path)[1].lower() in [ | ||||||
|         return bool(mimetype and mimetype.startswith("video/")) |         ".mp4", ".mkv" | ||||||
|     return False |     ] | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def conditional_download(download_directory_path: str, urls: List[str]) -> None: | def conditional_download(download_directory_path: str, urls: List[str]) -> None: | ||||||
|     if not os.path.exists(download_directory_path): |     """Download files from URLs if they do not exist in the directory.""" | ||||||
|         os.makedirs(download_directory_path) |     import requests | ||||||
|     for url in urls: |     for url in urls: | ||||||
|         download_file_path = os.path.join( |         filename = os.path.basename(url) | ||||||
|             download_directory_path, os.path.basename(url) |         file_path = os.path.join(download_directory_path, filename) | ||||||
|         ) |         if not os.path.exists(file_path): | ||||||
|         if not os.path.exists(download_file_path): |             try: | ||||||
|             request = urllib.request.urlopen(url)  # type: ignore[attr-defined] |                 print(f"Downloading {url}...") | ||||||
|             total = int(request.headers.get("Content-Length", 0)) |                 r = requests.get(url, stream=True) | ||||||
|             with tqdm( |                 with open(file_path, "wb") as f: | ||||||
|                 total=total, |                     for chunk in r.iter_content(chunk_size=8192): | ||||||
|                 desc="Downloading", |                         if chunk: | ||||||
|                 unit="B", |                             f.write(chunk) | ||||||
|                 unit_scale=True, |                 print(f"Downloaded {filename}") | ||||||
|                 unit_divisor=1024, |             except Exception as e: | ||||||
|             ) as progress: |                 print(f"Error downloading {url}: {e}") | ||||||
|                 urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size))  # type: ignore[attr-defined] |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def resolve_relative_path(path: str) -> str: | def resolve_relative_path(path: str) -> str: | ||||||
|     return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) |     """Resolve a relative path to an absolute path.""" | ||||||
|  |     return os.path.abspath(path) | ||||||
|  |  | ||||||
|  | @ -1,8 +1,8 @@ | ||||||
| import cv2 | import cv2 | ||||||
| import numpy as np | import numpy as np | ||||||
| from typing import Optional, Tuple, Callable |  | ||||||
| import platform | import platform | ||||||
| import threading | import threading | ||||||
|  | from typing import Optional, Tuple, Callable | ||||||
| 
 | 
 | ||||||
| # Only import Windows-specific library if on Windows | # Only import Windows-specific library if on Windows | ||||||
| if platform.system() == "Windows": | if platform.system() == "Windows": | ||||||
|  | @ -11,17 +11,15 @@ if platform.system() == "Windows": | ||||||
| 
 | 
 | ||||||
| class VideoCapturer: | class VideoCapturer: | ||||||
|     def __init__(self, device_index: int): |     def __init__(self, device_index: int): | ||||||
|  |         """Initialize the video capturer for a given device index.""" | ||||||
|         self.device_index = device_index |         self.device_index = device_index | ||||||
|         self.frame_callback = None |         self.frame_callback = None | ||||||
|         self._current_frame = None |         self._current_frame = None | ||||||
|         self._frame_ready = threading.Event() |         self._frame_ready = threading.Event() | ||||||
|         self.is_running = False |         self.is_running = False | ||||||
|         self.cap = None |         self.cap = None | ||||||
| 
 |  | ||||||
|         # Initialize Windows-specific components if on Windows |  | ||||||
|         if platform.system() == "Windows": |         if platform.system() == "Windows": | ||||||
|             self.graph = FilterGraph() |             self.graph = FilterGraph() | ||||||
|             # Verify device exists |  | ||||||
|             devices = self.graph.get_input_devices() |             devices = self.graph.get_input_devices() | ||||||
|             if self.device_index >= len(devices): |             if self.device_index >= len(devices): | ||||||
|                 raise ValueError( |                 raise ValueError( | ||||||
|  | @ -29,40 +27,31 @@ class VideoCapturer: | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: |     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool: | ||||||
|         """Initialize and start video capture""" |         """Initialize and start video capture.""" | ||||||
|         try: |         try: | ||||||
|             if platform.system() == "Windows": |             if platform.system() == "Windows": | ||||||
|                 # Windows-specific capture methods |  | ||||||
|                 capture_methods = [ |                 capture_methods = [ | ||||||
|                     (self.device_index, cv2.CAP_DSHOW),  # Try DirectShow first |                     (self.device_index, cv2.CAP_DSHOW), | ||||||
|                     (self.device_index, cv2.CAP_ANY),  # Then try default backend |                     (self.device_index, cv2.CAP_ANY), | ||||||
|                     (-1, cv2.CAP_ANY),  # Try -1 as fallback |                     (-1, cv2.CAP_ANY), | ||||||
|                     (0, cv2.CAP_ANY),  # Finally try 0 without specific backend |                     (0, cv2.CAP_ANY), | ||||||
|                 ] |                 ] | ||||||
| 
 |  | ||||||
|                 for dev_id, backend in capture_methods: |                 for dev_id, backend in capture_methods: | ||||||
|                     try: |                     try: | ||||||
|                         self.cap = cv2.VideoCapture(dev_id, backend) |                         self.cap = cv2.VideoCapture(dev_id, backend) | ||||||
|                         if self.cap.isOpened(): |                         if self.cap.isOpened(): | ||||||
|                             break |                             break | ||||||
|                         self.cap.release() |                     except Exception as e: | ||||||
|                     except Exception: |                         print(f"Error opening camera with backend {backend}: {e}") | ||||||
|                         continue |  | ||||||
|             else: |             else: | ||||||
|                 # Unix-like systems (Linux/Mac) capture method |  | ||||||
|                 self.cap = cv2.VideoCapture(self.device_index) |                 self.cap = cv2.VideoCapture(self.device_index) | ||||||
| 
 |  | ||||||
|             if not self.cap or not self.cap.isOpened(): |             if not self.cap or not self.cap.isOpened(): | ||||||
|                 raise RuntimeError("Failed to open camera") |                 raise RuntimeError("Failed to open camera") | ||||||
| 
 |  | ||||||
|             # Configure format |  | ||||||
|             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) |             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) | ||||||
|             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) |             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) | ||||||
|             self.cap.set(cv2.CAP_PROP_FPS, fps) |             self.cap.set(cv2.CAP_PROP_FPS, fps) | ||||||
| 
 |  | ||||||
|             self.is_running = True |             self.is_running = True | ||||||
|             return True |             return True | ||||||
| 
 |  | ||||||
|         except Exception as e: |         except Exception as e: | ||||||
|             print(f"Failed to start capture: {str(e)}") |             print(f"Failed to start capture: {str(e)}") | ||||||
|             if self.cap: |             if self.cap: | ||||||
|  | @ -70,10 +59,9 @@ class VideoCapturer: | ||||||
|             return False |             return False | ||||||
| 
 | 
 | ||||||
|     def read(self) -> Tuple[bool, Optional[np.ndarray]]: |     def read(self) -> Tuple[bool, Optional[np.ndarray]]: | ||||||
|         """Read a frame from the camera""" |         """Read a frame from the camera.""" | ||||||
|         if not self.is_running or self.cap is None: |         if not self.is_running or self.cap is None: | ||||||
|             return False, None |             return False, None | ||||||
| 
 |  | ||||||
|         ret, frame = self.cap.read() |         ret, frame = self.cap.read() | ||||||
|         if ret: |         if ret: | ||||||
|             self._current_frame = frame |             self._current_frame = frame | ||||||
|  | @ -83,12 +71,12 @@ class VideoCapturer: | ||||||
|         return False, None |         return False, None | ||||||
| 
 | 
 | ||||||
|     def release(self) -> None: |     def release(self) -> None: | ||||||
|         """Stop capture and release resources""" |         """Stop capture and release resources.""" | ||||||
|         if self.is_running and self.cap is not None: |         if self.is_running and self.cap is not None: | ||||||
|             self.cap.release() |             self.cap.release() | ||||||
|             self.is_running = False |             self.is_running = False | ||||||
|             self.cap = None |             self.cap = None | ||||||
| 
 | 
 | ||||||
|     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: |     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: | ||||||
|         """Set callback for frame processing""" |         """Set callback for frame processing.""" | ||||||
|         self.frame_callback = callback |         self.frame_callback = callback | ||||||
|  |  | ||||||
|  | @ -0,0 +1,19 @@ | ||||||
|  | #!/bin/zsh | ||||||
|  | # push_to_new_branch.sh - Commit and push changes to a new branch in Deep-Live-Cam-remote | ||||||
|  | 
 | ||||||
|  | REPO_DIR="Deep-Live-Cam-remote" | ||||||
|  | BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" | ||||||
|  | 
 | ||||||
|  | if [ ! -d "$REPO_DIR/.git" ]; then | ||||||
|  |   echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." | ||||||
|  |   exit 1 | ||||||
|  | fi | ||||||
|  | 
 | ||||||
|  | cd "$REPO_DIR" | ||||||
|  | git add . | ||||||
|  | echo "Enter a commit message: " | ||||||
|  | read COMMIT_MSG | ||||||
|  | git commit -m "$COMMIT_MSG" | ||||||
|  | git checkout -b "$BRANCH_NAME" | ||||||
|  | git push origin "$BRANCH_NAME" | ||||||
|  | echo "Pushed to branch $BRANCH_NAME on remote." | ||||||
|  | @ -0,0 +1,23 @@ | ||||||
|  | #!/bin/zsh | ||||||
|  | # push_to_rehanbgmi.sh - Commit and push changes to your fork (rehanbgmi/deeplivceam) in Deep-Live-Cam-remote | ||||||
|  | 
 | ||||||
|  | REPO_DIR="Deep-Live-Cam-remote" | ||||||
|  | FORK_URL="https://github.com/rehanbgmi/deeplivceam.git" | ||||||
|  | BRANCH_NAME="feature-$(date +%Y%m%d-%H%M%S)" | ||||||
|  | 
 | ||||||
|  | if [ ! -d "$REPO_DIR/.git" ]; then | ||||||
|  |   echo "Error: $REPO_DIR is not a git repository. Run the clone_or_update_deep_live_cam.sh script first." | ||||||
|  |   exit 1 | ||||||
|  | fi | ||||||
|  | 
 | ||||||
|  | cd "$REPO_DIR" | ||||||
|  | # Set your fork as a remote if not already set | ||||||
|  | git remote | grep rehanbgmi > /dev/null || git remote add rehanbgmi "$FORK_URL" | ||||||
|  | 
 | ||||||
|  | git add . | ||||||
|  | echo "Enter a commit message: " | ||||||
|  | read COMMIT_MSG | ||||||
|  | git commit -m "$COMMIT_MSG" | ||||||
|  | git checkout -b "$BRANCH_NAME" | ||||||
|  | git push rehanbgmi "$BRANCH_NAME" | ||||||
|  | echo "Pushed to branch $BRANCH_NAME on your fork (rehanbgmi/deeplivceam)." | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | #!/bin/zsh | ||||||
|  | # run-coreml-macos.sh - Run Deep-Live-Cam with CoreML (Apple Silicon) on macOS | ||||||
|  | source venv/bin/activate | ||||||
|  | python3.10 run.py --execution-provider coreml | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | @echo off | ||||||
|  | REM run-coreml.bat - Run Deep-Live-Cam with CoreML (Apple Silicon) on Windows (for reference, not for actual use) | ||||||
|  | call venv\Scripts\activate | ||||||
|  | python run.py --execution-provider coreml | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | #!/bin/zsh | ||||||
|  | # run-cuda-macos.sh - Run Deep-Live-Cam with CUDA (Nvidia GPU) on macOS | ||||||
|  | source venv/bin/activate | ||||||
|  | python run.py --execution-provider cuda | ||||||
|  | @ -1 +1,2 @@ | ||||||
|  | call venv\Scripts\activate | ||||||
| python run.py --execution-provider cuda | python run.py --execution-provider cuda | ||||||
		Loading…
	
		Reference in New Issue