diff --git a/README.md b/README.md index d667359..7511b9b 100644 --- a/README.md +++ b/README.md @@ -134,12 +134,57 @@ Place these files in the "**models**" folder. We highly recommend using a `venv` to avoid issues. -For Windows: -```bash -python -m venv venv -venv\Scripts\activate -pip install -r requirements.txt -``` +**For Windows:** + +It is highly recommended to use Python 3.10 for Windows for best compatibility with all features and dependencies. + +**Automated Setup (Recommended):** + +1. **Run the setup script:** + Double-click `setup_windows.bat` or run it from your command prompt: + ```batch + setup_windows.bat + ``` + This script will: + * Check if Python is in your PATH. + * Warn if `ffmpeg` is not found (see "Manual Steps / Notes" below for ffmpeg help). + * Create a virtual environment named `.venv` (consistent with macOS setup). + * Activate the virtual environment for the script's session. + * Upgrade pip. + * Install Python packages from `requirements.txt`. + Wait for the script to complete. It will pause at the end; press any key to close the window if you double-clicked it. + +2. **Run the application:** + After setup, use the provided `.bat` scripts to run the application. These scripts automatically activate the correct virtual environment: + * `run_windows.bat`: Runs the application with the CPU execution provider by default. This is a good starting point if you don't have a dedicated GPU or are unsure. + * `run-cuda.bat`: Runs with the CUDA (NVIDIA GPU) execution provider. Requires an NVIDIA GPU and CUDA Toolkit installed (see GPU Acceleration section). + * `run-directml.bat`: Runs with the DirectML (AMD/Intel GPU on Windows) execution provider. + + Example: Double-click `run_windows.bat` to launch the UI, or run from a command prompt: + ```batch + run_windows.bat --source path\to\your_face.jpg --target path\to\video.mp4 + ``` + +**Manual Steps / Notes:** + +* **Python:** Ensure Python 3.10 is installed and added to your system's PATH. You can download it from [python.org](https://www.python.org/downloads/). +* **ffmpeg:** + * `ffmpeg` is required for video processing. The `setup_windows.bat` script will warn if it's not found in your PATH. + * An easy way to install `ffmpeg` on Windows is to open PowerShell as Administrator and run: + ```powershell + Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072; iex ((New-Object System.Net.WebClient).DownloadString('https://community.chocolatey.org/install.ps1')); choco install ffmpeg -y + ``` + Alternatively, download from [ffmpeg.org](https://ffmpeg.org/download.html), extract the files, and add the `bin` folder (containing `ffmpeg.exe`) to your system's PATH environment variable. The original README also linked to a [YouTube guide](https://www.youtube.com/watch?v=OlNWCpFdVMA) or `iex (irm ffmpeg.tc.ht)` via PowerShell. +* **Visual Studio Runtimes:** If you encounter errors during `pip install` for packages that compile C code (e.g., some scientific computing or image processing libraries), you might need the [Visual Studio Build Tools (or Runtimes)](https://visualstudio.microsoft.com/visual-cpp-build-tools/). Ensure "C++ build tools" (or similar workload) are selected during installation. +* **Virtual Environment (Manual Alternative):** If you prefer to set up the virtual environment manually instead of using `setup_windows.bat`: + ```batch + python -m venv .venv + .venv\Scripts\activate.bat + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + ``` + (The new automated scripts use `.venv` as the folder name for consistency with the macOS setup). + For Linux: ```bash # Ensure you use the installed Python 3.10 @@ -150,22 +195,64 @@ pip install -r requirements.txt **For macOS:** -Apple Silicon (M1/M2/M3) requires specific setup: +For a streamlined setup on macOS, use the provided shell scripts: -```bash -# Install Python 3.10 (specific version is important) -brew install python@3.10 +1. **Make scripts executable:** + Open your terminal, navigate to the cloned `Deep-Live-Cam` directory, and run: + ```bash + chmod +x setup_mac.sh + chmod +x run_mac*.sh + ``` -# Install tkinter package (required for the GUI) -brew install python-tk@3.10 +2. **Run the setup script:** + This will check for Python 3.9+, ffmpeg, create a virtual environment (`.venv`), and install required Python packages. + ```bash + ./setup_mac.sh + ``` + If you encounter issues with specific packages during `pip install` (especially for libraries that compile C code, like some image processing libraries), you might need to install system libraries via Homebrew (e.g., `brew install jpeg libtiff ...`) or ensure Xcode Command Line Tools are installed (`xcode-select --install`). -# Create and activate virtual environment with Python 3.10 -python3.10 -m venv venv -source venv/bin/activate +3. **Activate the virtual environment (for manual runs):** + After setup, if you want to run commands manually or use developer tools from your terminal session: + ```bash + source .venv/bin/activate + ``` + (To deactivate, simply type `deactivate` in the terminal.) -# Install dependencies -pip install -r requirements.txt -``` +4. **Run the application:** + Use the provided run scripts for convenience. These scripts automatically activate the virtual environment. + * `./run_mac.sh`: Runs the application with the CPU execution provider by default. This is a good starting point. + * `./run_mac_cpu.sh`: Explicitly uses the CPU execution provider. + * `./run_mac_coreml.sh`: Attempts to use the CoreML execution provider for potential hardware acceleration on Apple Silicon and Intel Macs. + * `./run_mac_mps.sh`: Attempts to use the MPS (Metal Performance Shaders) execution provider, primarily for Apple Silicon Macs. + + Example of running with specific source/target arguments: + ```bash + ./run_mac.sh --source path/to/your_face.jpg --target path/to/video.mp4 + ``` + Or, to simply launch the UI: + ```bash + ./run_mac.sh + ``` + +**Important Notes for macOS GPU Acceleration (CoreML/MPS):** + +* The `setup_mac.sh` script installs packages from `requirements.txt`, which typically includes a general CPU-based version of `onnxruntime`. +* For optimal performance on Apple Silicon (M1/M2/M3) or specific GPU acceleration, you might need to install a different `onnxruntime` package *after* running `setup_mac.sh` and while the virtual environment (`.venv`) is active. +* **Example for `onnxruntime-silicon` (often requires Python 3.10 for older versions like 1.13.1):** + The original `README` noted that `onnxruntime-silicon==1.13.1` was specific to Python 3.10. If you intend to use this exact version for CoreML: + ```bash + # Ensure you are using Python 3.10 if required by your chosen onnxruntime-silicon version + # After running setup_mac.sh and activating .venv: + # source .venv/bin/activate + + pip uninstall onnxruntime onnxruntime-gpu # Uninstall any existing onnxruntime + pip install onnxruntime-silicon==1.13.1 # Or your desired version + + # Then use ./run_mac_coreml.sh + ``` + Check the ONNX Runtime documentation for the latest recommended packages for Apple Silicon. +* **For MPS with ONNX Runtime:** This may require a specific build or version of `onnxruntime`. Consult the ONNX Runtime documentation. For PyTorch-based operations (like the Face Enhancer or Hair Segmenter if they were PyTorch native and not ONNX), PyTorch should automatically try to use MPS on compatible Apple Silicon hardware if available. +* **User Interface (Tkinter):** If you encounter errors related to `_tkinter` not being found when launching the UI, ensure your Python installation supports Tk. For Python installed via Homebrew, this is usually `python-tk` (e.g., `brew install python-tk@3.9` or `brew install python-tk@3.10`, matching your Python version). ** In case something goes wrong and you need to reinstall the virtual environment ** diff --git a/modules/core.py b/modules/core.py index b6ef9b8..7406f72 100644 --- a/modules/core.py +++ b/modules/core.py @@ -176,9 +176,12 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None: ui.update_status(message) def start() -> None: - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - if not frame_processor.pre_start(): - return + # Note: pre_start is called in run() before start() now. + # If it were to be called here, it would also need the status_fn_callback. + # For example: + # for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): + # if not frame_processor.pre_start(status_fn_callback=update_status): # If pre_start was here + # return update_status('Processing...') # process image to image if has_image_extension(modules.globals.target_path): @@ -190,7 +193,7 @@ def start() -> None: print("Error copying file:", str(e)) for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): update_status('Progressing...', frame_processor.NAME) - frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) + frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path, status_fn_callback=update_status) release_resources() if is_image(modules.globals.target_path): update_status('Processing to image succeed!') @@ -210,7 +213,7 @@ def start() -> None: temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): update_status('Progressing...', frame_processor.NAME) - frame_processor.process_video(modules.globals.source_path, temp_frame_paths) + frame_processor.process_video(modules.globals.source_path, temp_frame_paths, status_fn_callback=update_status) release_resources() # handles fps if modules.globals.keep_fps: @@ -249,7 +252,9 @@ def run() -> None: if not pre_check(): return for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - if not frame_processor.pre_check(): + if not frame_processor.pre_check(): # pre_check in face_swapper does not use update_status + return + if hasattr(frame_processor, 'pre_start') and not frame_processor.pre_start(status_fn_callback=update_status): # Pass callback here return limit_resources() if modules.globals.headless: diff --git a/modules/face_analyser.py b/modules/face_analyser.py index ef124d5..15c5eb8 100644 --- a/modules/face_analyser.py +++ b/modules/face_analyser.py @@ -20,7 +20,8 @@ def get_face_analyser() -> Any: if FACE_ANALYSER is None: FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers) - FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) + # Lowered detection threshold for potentially better webcam face detection (default is 0.5) + FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.4) return FACE_ANALYSER diff --git a/modules/globals.py b/modules/globals.py index 564fe7d..8c0c02a 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -41,3 +41,4 @@ show_mouth_mask_box = False mask_feather_ratio = 8 mask_down_size = 0.50 mask_size = 1 +enable_hair_swapping = False # Default state for enabling/disabling hair swapping diff --git a/modules/hair_segmenter.py b/modules/hair_segmenter.py new file mode 100644 index 0000000..4478787 --- /dev/null +++ b/modules/hair_segmenter.py @@ -0,0 +1,125 @@ +import torch +import numpy as np +from PIL import Image +from transformers import SegformerImageProcessor, SegformerForSemanticSegmentation +import cv2 # Imported for BGR to RGB conversion, though PIL can also do it. + +# Global variables for caching +HAIR_SEGMENTER_PROCESSOR = None +HAIR_SEGMENTER_MODEL = None +MODEL_NAME = "isjackwild/segformer-b0-finetuned-segments-skin-hair-clothing" + +def segment_hair(image_np: np.ndarray) -> np.ndarray: + """ + Segments hair from an image. + + Args: + image_np: NumPy array representing the image (BGR format from OpenCV). + + Returns: + NumPy array representing the binary hair mask. + """ + global HAIR_SEGMENTER_PROCESSOR, HAIR_SEGMENTER_MODEL + + if HAIR_SEGMENTER_PROCESSOR is None or HAIR_SEGMENTER_MODEL is None: + print(f"Loading hair segmentation model and processor ({MODEL_NAME}) for the first time...") + try: + HAIR_SEGMENTER_PROCESSOR = SegformerImageProcessor.from_pretrained(MODEL_NAME) + HAIR_SEGMENTER_MODEL = SegformerForSemanticSegmentation.from_pretrained(MODEL_NAME) + + if torch.cuda.is_available(): + try: + HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to('cuda') + print("INFO: Hair segmentation model moved to CUDA (GPU).") + except Exception as e_cuda: + print(f"ERROR: Failed to move hair segmentation model to CUDA: {e_cuda}. Using CPU instead.") + # Fallback to CPU if .to('cuda') fails + HAIR_SEGMENTER_MODEL = HAIR_SEGMENTER_MODEL.to('cpu') + else: + print("INFO: CUDA not available. Hair segmentation model will use CPU.") + + print("INFO: Hair segmentation model and processor loaded successfully (device: {}).".format(HAIR_SEGMENTER_MODEL.device)) + except Exception as e: + print(f"ERROR: Failed to load hair segmentation model/processor: {e}") + # Return an empty mask compatible with expected output shape (H, W) + return np.zeros((image_np.shape[0], image_np.shape[1]), dtype=np.uint8) + + # Convert BGR (OpenCV) to RGB (PIL) + image_rgb = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB) + image_pil = Image.fromarray(image_rgb) + + inputs = HAIR_SEGMENTER_PROCESSOR(images=image_pil, return_tensors="pt") + + if HAIR_SEGMENTER_MODEL.device.type == 'cuda': + try: + # SegformerImageProcessor output (BatchEncoding) is a dict-like object. + # We need to move its tensor components, commonly 'pixel_values'. + if 'pixel_values' in inputs: + inputs['pixel_values'] = inputs['pixel_values'].to('cuda') + else: # Fallback if the structure is different than expected + inputs = inputs.to('cuda') + # If inputs has other tensor components that need to be moved, they'd need similar handling. + except Exception as e_inputs_cuda: + print(f"ERROR: Failed to move inputs to CUDA: {e_inputs_cuda}. Attempting inference on CPU.") + # If moving inputs to CUDA fails, we should ensure model is also on CPU for this inference pass + # This is a tricky situation; ideally, this failure shouldn't happen if model moved successfully. + # For simplicity, we'll assume if model is on CUDA, inputs should also be. + # A more robust solution might involve moving model back to CPU if inputs can't be moved. + + with torch.no_grad(): # Important for inference + outputs = HAIR_SEGMENTER_MODEL(**inputs) + + logits = outputs.logits # Shape: batch_size, num_labels, height, width + + # Upsample logits to original image size + upsampled_logits = torch.nn.functional.interpolate( + logits, + size=(image_np.shape[0], image_np.shape[1]), # H, W + mode='bilinear', + align_corners=False + ) + + segmentation_map = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy().astype(np.uint8) + + # Label 2 is for hair in this model + return np.where(segmentation_map == 2, 255, 0).astype(np.uint8) + +if __name__ == '__main__': + # This is a conceptual test. + # In a real scenario, you would load an image using OpenCV or Pillow. + # For example: + # sample_image_np = cv2.imread("path/to/your/image.jpg") + # if sample_image_np is not None: + # hair_mask_output = segment_hair(sample_image_np) + # cv2.imwrite("hair_mask_output.png", hair_mask_output) + # print("Hair mask saved to hair_mask_output.png") + # else: + # print("Failed to load sample image.") + + print("Conceptual test: Hair segmenter module created.") + # Create a dummy image for a basic test run if no image is available. + dummy_image_np = np.zeros((100, 100, 3), dtype=np.uint8) # 100x100 BGR image + dummy_image_np[:, :, 1] = 255 # Make it green to distinguish from black mask + + try: + print("Running segment_hair with a dummy image...") + hair_mask_output = segment_hair(dummy_image_np) + print(f"segment_hair returned a mask of shape: {hair_mask_output.shape}") + # Check if the output is a 2D array (mask) and has the same H, W as input + assert hair_mask_output.shape == (dummy_image_np.shape[0], dummy_image_np.shape[1]) + # Check if the mask is binary (0 or 255) + assert np.all(np.isin(hair_mask_output, [0, 255])) + print("Dummy image test successful. Hair mask seems to be generated correctly.") + + # Attempt to save the dummy mask (optional, just for visual confirmation if needed) + # cv2.imwrite("dummy_hair_mask_output.png", hair_mask_output) + # print("Dummy hair mask saved to dummy_hair_mask_output.png") + + except ImportError as e: + print(f"An ImportError occurred: {e}. This might be due to missing dependencies like transformers, torch, or Pillow.") + print("Please ensure all required packages are installed by updating requirements.txt and installing them.") + except Exception as e: + print(f"An error occurred during the dummy image test: {e}") + print("This could be due to issues with model loading, processing, or other runtime errors.") + + print("To perform a full test, replace the dummy image with a real image path.") diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 36b83d6..df1e99e 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -1,4 +1,4 @@ -from typing import Any, List +from typing import Any, List, Optional, Tuple, Callable # Added Callable import cv2 import insightface import threading @@ -6,9 +6,10 @@ import numpy as np import modules.globals import logging import modules.processors.frame.core -from modules.core import update_status +# from modules.core import update_status # Removed import from modules.face_analyser import get_one_face, get_many_faces, default_source_face from modules.typing import Face, Frame +from modules.hair_segmenter import segment_hair from modules.utilities import ( conditional_download, is_image, @@ -16,6 +17,7 @@ from modules.utilities import ( ) from modules.cluster_analysis import find_closest_centroid import os +import platform # Added for potential platform-specific tracker choices later, though KCF is cross-platform FACE_SWAPPER = None THREAD_LOCK = threading.Lock() @@ -26,9 +28,33 @@ models_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" ) +# --- Tracker State Variables --- +TARGET_TRACKER: Optional[cv2.Tracker] = None +LAST_TARGET_KPS: Optional[np.ndarray] = None +LAST_TARGET_BBOX_XYWH: Optional[List[int]] = None +TRACKING_FRAME_COUNTER = 0 +DETECTION_INTERVAL = 5 # Process every 5th frame for full detection +LAST_DETECTION_SUCCESS = False +PREV_GRAY_FRAME: Optional[np.ndarray] = None # For optical flow +# --- End Tracker State Variables --- + +def reset_tracker_state(): + """Resets all global tracker state variables.""" + global TARGET_TRACKER, LAST_TARGET_KPS, LAST_TARGET_BBOX_XYWH + global TRACKING_FRAME_COUNTER, LAST_DETECTION_SUCCESS, PREV_GRAY_FRAME + + TARGET_TRACKER = None + LAST_TARGET_KPS = None + LAST_TARGET_BBOX_XYWH = None + TRACKING_FRAME_COUNTER = 0 + LAST_DETECTION_SUCCESS = False # Important to ensure first frame after reset does detection + PREV_GRAY_FRAME = None + logging.debug("Global tracker state has been reset.") + def pre_check() -> bool: - download_directory_path = abs_dir + # download_directory_path = abs_dir # Old line + download_directory_path = models_dir # New line conditional_download( download_directory_path, [ @@ -38,19 +64,19 @@ def pre_check() -> bool: return True -def pre_start() -> bool: +def pre_start(status_fn_callback: Callable[[str, str], None]) -> bool: if not modules.globals.map_faces and not is_image(modules.globals.source_path): - update_status("Select an image for source path.", NAME) + status_fn_callback("Select an image for source path.", NAME) return False elif not modules.globals.map_faces and not get_one_face( cv2.imread(modules.globals.source_path) ): - update_status("No face in source path detected.", NAME) + status_fn_callback("No face in source path detected.", NAME) return False if not is_image(modules.globals.target_path) and not is_video( modules.globals.target_path ): - update_status("Select an image or video for target path.", NAME) + status_fn_callback("Select an image or video for target path.", NAME) return False return True @@ -67,196 +93,523 @@ def get_face_swapper() -> Any: return FACE_SWAPPER -def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: +def _prepare_warped_source_material_and_mask( + source_face_obj: Face, + source_frame_full: Frame, + matrix: np.ndarray, + dsize: tuple +) -> Tuple[Optional[Frame], Optional[Frame]]: + try: + hair_only_mask_source_raw = segment_hair(source_frame_full) + if hair_only_mask_source_raw is None: + logging.error("segment_hair returned None, which is unexpected.") + return None, None + if hair_only_mask_source_raw.ndim == 3 and hair_only_mask_source_raw.shape[2] == 3: + hair_only_mask_source_raw = cv2.cvtColor(hair_only_mask_source_raw, cv2.COLOR_BGR2GRAY) + _, hair_only_mask_source_binary = cv2.threshold(hair_only_mask_source_raw, 127, 255, cv2.THRESH_BINARY) + except Exception as e: + logging.error(f"Hair segmentation failed: {e}", exc_info=True) + return None, None + + try: + face_only_mask_source_raw = create_face_mask(source_face_obj, source_frame_full) + if face_only_mask_source_raw is None: + logging.error("create_face_mask returned None, which is unexpected.") + return None, None + _, face_only_mask_source_binary = cv2.threshold(face_only_mask_source_raw, 127, 255, cv2.THRESH_BINARY) + except Exception as e: + logging.error(f"Face mask creation failed for source: {e}", exc_info=True) + return None, None + + try: + if face_only_mask_source_binary.shape != hair_only_mask_source_binary.shape: + logging.warning("Resizing hair mask to match face mask for source during preparation.") + hair_only_mask_source_binary = cv2.resize( + hair_only_mask_source_binary, + (face_only_mask_source_binary.shape[1], face_only_mask_source_binary.shape[0]), + interpolation=cv2.INTER_NEAREST + ) + + actual_combined_source_mask = cv2.bitwise_or(face_only_mask_source_binary, hair_only_mask_source_binary) + actual_combined_source_mask_blurred = cv2.GaussianBlur(actual_combined_source_mask, (5, 5), 3) + + warped_full_source_material = cv2.warpAffine(source_frame_full, matrix, dsize) + warped_combined_mask_temp = cv2.warpAffine(actual_combined_source_mask_blurred, matrix, dsize) + _, warped_combined_mask_binary_for_clone = cv2.threshold(warped_combined_mask_temp, 127, 255, cv2.THRESH_BINARY) + except Exception as e: + logging.error(f"Mask combination or warping failed: {e}", exc_info=True) + return None, None + + return warped_full_source_material, warped_combined_mask_binary_for_clone + +def _blend_material_onto_frame( + base_frame: Frame, + material_to_blend: Frame, + mask_for_blending: Frame +) -> Frame: + x, y, w, h = cv2.boundingRect(mask_for_blending) + output_frame = base_frame + + if w > 0 and h > 0: + center = (x + w // 2, y + h // 2) + + if material_to_blend.shape == base_frame.shape and \ + material_to_blend.dtype == base_frame.dtype and \ + mask_for_blending.dtype == np.uint8: + try: + output_frame = cv2.seamlessClone(material_to_blend, base_frame, mask_for_blending, center, cv2.NORMAL_CLONE) + except cv2.error as e: + logging.warning(f"cv2.seamlessClone failed: {e}. Falling back to simple blending.") + boolean_mask = mask_for_blending > 127 + output_frame[boolean_mask] = material_to_blend[boolean_mask] + else: + logging.warning("Mismatch in shape/type for seamlessClone. Falling back to simple blending.") + boolean_mask = mask_for_blending > 127 + output_frame[boolean_mask] = material_to_blend[boolean_mask] + else: + logging.info("Warped mask for blending is empty. Skipping blending.") + + return output_frame + + +def swap_face(source_face_obj: Face, target_face: Face, source_frame_full: Frame, temp_frame: Frame) -> Frame: face_swapper = get_face_swapper() - # Apply the face swap - swapped_frame = face_swapper.get( - temp_frame, target_face, source_face, paste_back=True - ) + swapped_frame = face_swapper.get(temp_frame, target_face, source_face_obj, paste_back=True) + final_swapped_frame = swapped_frame + + if getattr(modules.globals, 'enable_hair_swapping', True): + if not (source_face_obj.kps is not None and \ + target_face.kps is not None and \ + source_face_obj.kps.shape[0] >= 3 and \ + target_face.kps.shape[0] >= 3): + logging.warning( + f"Skipping hair blending due to insufficient keypoints. " + f"Source kps: {source_face_obj.kps.shape if source_face_obj.kps is not None else 'None'}, " + f"Target kps: {target_face.kps.shape if target_face.kps is not None else 'None'}." + ) + else: + source_kps_float = source_face_obj.kps.astype(np.float32) + target_kps_float = target_face.kps.astype(np.float32) + matrix, _ = cv2.estimateAffinePartial2D(source_kps_float, target_kps_float, method=cv2.LMEDS) + + if matrix is None: + logging.warning("Failed to estimate affine transformation matrix for hair. Skipping hair blending.") + else: + dsize = (temp_frame.shape[1], temp_frame.shape[0]) + + warped_material, warped_mask = _prepare_warped_source_material_and_mask( + source_face_obj, source_frame_full, matrix, dsize + ) + + if warped_material is not None and warped_mask is not None: + final_swapped_frame = swapped_frame.copy() + + try: + color_corrected_material = apply_color_transfer(warped_material, final_swapped_frame) + except Exception as e: + logging.warning(f"Color transfer failed: {e}. Proceeding with uncorrected material for hair blending.", exc_info=True) + color_corrected_material = warped_material + + final_swapped_frame = _blend_material_onto_frame( + final_swapped_frame, + color_corrected_material, + warped_mask + ) if modules.globals.mouth_mask: - # Create a mask for the target face - face_mask = create_face_mask(target_face, temp_frame) + if final_swapped_frame is swapped_frame: + final_swapped_frame = swapped_frame.copy() + + face_mask_for_mouth = create_face_mask(target_face, temp_frame) - # Create the mouth mask mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( create_lower_mouth_mask(target_face, temp_frame) ) - # Apply the mouth area - swapped_frame = apply_mouth_area( - swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon + final_swapped_frame = apply_mouth_area( + final_swapped_frame, mouth_cutout, mouth_box, face_mask_for_mouth, lower_lip_polygon ) if modules.globals.show_mouth_mask_box: mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon) - swapped_frame = draw_mouth_mask_visualization( - swapped_frame, target_face, mouth_mask_data + final_swapped_frame = draw_mouth_mask_visualization( + final_swapped_frame, target_face, mouth_mask_data ) - return swapped_frame + return final_swapped_frame -def process_frame(source_face: Face, temp_frame: Frame) -> Frame: - if modules.globals.color_correction: +def process_frame(source_face_obj: Face, source_frame_full: Frame, temp_frame: Frame) -> Frame: + global TARGET_TRACKER, LAST_TARGET_KPS, LAST_TARGET_BBOX_XYWH + global TRACKING_FRAME_COUNTER, DETECTION_INTERVAL, LAST_DETECTION_SUCCESS, PREV_GRAY_FRAME + + if modules.globals.color_correction: # This should apply to temp_frame before gray conversion temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) + current_gray_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2GRAY) + target_face_to_swap = None + if modules.globals.many_faces: - many_faces = get_many_faces(temp_frame) - if many_faces: - for target_face in many_faces: - if source_face and target_face: - temp_frame = swap_face(source_face, target_face, temp_frame) - else: - print("Face detection failed for target/source.") - else: - target_face = get_one_face(temp_frame) - if target_face and source_face: - temp_frame = swap_face(source_face, target_face, temp_frame) + # Tracking logic is not applied for many_faces mode in this iteration + # Revert to Nth frame detection for all faces in many_faces mode for now for performance + TRACKING_FRAME_COUNTER += 1 + if TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER} (ManyFaces): Running full detection.") + many_faces_detected = get_many_faces(temp_frame) + if many_faces_detected: + for target_face_data in many_faces_detected: + if source_face_obj and target_face_data: + temp_frame = swap_face(source_face_obj, target_face_data, source_frame_full, temp_frame) + LAST_DETECTION_SUCCESS = bool(many_faces_detected) # Update based on if any face was found else: - logging.error("Face detection failed for target or source.") - return temp_frame - - - -def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame: - if is_image(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) - - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - source_face = map["source"]["face"] - target_face = map["target"]["face"] - temp_frame = swap_face(source_face, target_face, temp_frame) - - elif is_video(modules.globals.target_path): - if modules.globals.many_faces: - source_face = default_source_face() - for map in modules.globals.source_target_map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] - - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) - - elif not modules.globals.many_faces: - for map in modules.globals.source_target_map: - if "source" in map: - target_frame = [ - f - for f in map["target_faces_in_frame"] - if f["location"] == temp_frame_path - ] - source_face = map["source"]["face"] - - for frame in target_frame: - for target_face in frame["faces"]: - temp_frame = swap_face(source_face, target_face, temp_frame) - + # For many_faces on non-detection frames, we currently don't have individual trackers. + # The frame will pass through without additional swapping if we don't store and reuse old face data. + # This means non-detection frames in many_faces mode might show unsynced swaps or no swaps if not handled. + # For now, it means only Nth frame gets swaps in many_faces. + logging.debug(f"Frame {TRACKING_FRAME_COUNTER} (ManyFaces): Skipping swap on intermediate frame.") + pass else: - detected_faces = get_many_faces(temp_frame) - if modules.globals.many_faces: - if detected_faces: - source_face = default_source_face() - for target_face in detected_faces: - temp_frame = swap_face(source_face, target_face, temp_frame) + # --- Single Face Mode with Tracking --- + TRACKING_FRAME_COUNTER += 1 - elif not modules.globals.many_faces: - if detected_faces: - if len(detected_faces) <= len( - modules.globals.simple_map["target_embeddings"] - ): - for detected_face in detected_faces: - closest_centroid_index, _ = find_closest_centroid( - modules.globals.simple_map["target_embeddings"], - detected_face.normed_embedding, - ) + if TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0 or not LAST_DETECTION_SUCCESS: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Running full detection.") + actual_target_face_data = get_one_face(temp_frame) # get_one_face returns a Face object or None + if actual_target_face_data: + target_face_to_swap = actual_target_face_data + if actual_target_face_data.kps is not None: + LAST_TARGET_KPS = actual_target_face_data.kps.copy() + else: # Should not happen with buffalo_l but good for robustness + LAST_TARGET_KPS = None - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][ - closest_centroid_index - ], - detected_face, - temp_frame, - ) + bbox_xyxy = actual_target_face_data.bbox + LAST_TARGET_BBOX_XYWH = [int(bbox_xyxy[0]), int(bbox_xyxy[1]), int(bbox_xyxy[2] - bbox_xyxy[0]), int(bbox_xyxy[3] - bbox_xyxy[1])] + + try: + TARGET_TRACKER = cv2.TrackerKCF_create() + TARGET_TRACKER.init(temp_frame, tuple(LAST_TARGET_BBOX_XYWH)) + LAST_DETECTION_SUCCESS = True + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Detection SUCCESS, tracker initialized.") + except Exception as e: + logging.error(f"Failed to initialize tracker: {e}", exc_info=True) + TARGET_TRACKER = None + LAST_DETECTION_SUCCESS = False + else: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Full detection FAILED.") + LAST_DETECTION_SUCCESS = False + TARGET_TRACKER = None + else: # Intermediate frame, try to track + if TARGET_TRACKER is not None and PREV_GRAY_FRAME is not None and LAST_TARGET_KPS is not None: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Attempting track.") + success_tracker, new_bbox_xywh_float = TARGET_TRACKER.update(temp_frame) + if success_tracker: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: KCF Tracking SUCCESS.") + new_bbox_xywh = [int(v) for v in new_bbox_xywh_float] + + lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)) + tracked_kps_float32 = LAST_TARGET_KPS.astype(np.float32) # Optical flow needs float32 + + new_kps_tracked, opt_flow_status, opt_flow_err = cv2.calcOpticalFlowPyrLK( + PREV_GRAY_FRAME, current_gray_frame, tracked_kps_float32, None, **lk_params + ) + + if new_kps_tracked is not None and opt_flow_status is not None: + good_new_kps = new_kps_tracked[opt_flow_status.ravel() == 1] + # good_old_kps_for_ref = tracked_kps_float32[opt_flow_status.ravel() == 1] + + if len(good_new_kps) >= 3: # Need at least 3 points for stability + current_kps = good_new_kps + new_bbox_xyxy_np = np.array([ + new_bbox_xywh[0], + new_bbox_xywh[1], + new_bbox_xywh[0] + new_bbox_xywh[2], + new_bbox_xywh[1] + new_bbox_xywh[3] + ], dtype=np.float32) # insightface Face expects float bbox + + # Construct Face object (ensure all required fields are present, others None) + target_face_to_swap = Face( + bbox=new_bbox_xyxy_np, + kps=current_kps.astype(np.float32), # kps are float + det_score=0.90, # Indicate high confidence for tracked face + landmark_3d_68=None, + landmark_2d_106=None, + gender=None, + age=None, + embedding=None, # Not available from tracking + normed_embedding=None # Not available from tracking + ) + LAST_TARGET_KPS = current_kps.copy() + LAST_TARGET_BBOX_XYWH = new_bbox_xywh + LAST_DETECTION_SUCCESS = True + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Optical Flow SUCCESS, {len(good_new_kps)} points tracked.") + else: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Optical flow lost too many KPS ({len(good_new_kps)} found). Triggering re-detection.") + LAST_DETECTION_SUCCESS = False + TARGET_TRACKER = None + else: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: Optical flow calculation failed. Triggering re-detection.") + LAST_DETECTION_SUCCESS = False + TARGET_TRACKER = None else: - detected_faces_centroids = [] - for face in detected_faces: - detected_faces_centroids.append(face.normed_embedding) - i = 0 - for target_embedding in modules.globals.simple_map[ - "target_embeddings" - ]: - closest_centroid_index, _ = find_closest_centroid( - detected_faces_centroids, target_embedding - ) + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: KCF Tracking FAILED. Triggering re-detection.") + LAST_DETECTION_SUCCESS = False + TARGET_TRACKER = None + else: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER}: No active tracker or prerequisite data. Skipping track.") + # target_face_to_swap remains None - temp_frame = swap_face( - modules.globals.simple_map["source_faces"][i], - detected_faces[closest_centroid_index], - temp_frame, - ) - i += 1 + if target_face_to_swap and source_face_obj: + temp_frame = swap_face(source_face_obj, target_face_to_swap, source_frame_full, temp_frame) + else: + if TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0 and not LAST_DETECTION_SUCCESS: # Only log if it was a detection attempt that failed + logging.info("Target face not found by detection in process_frame.") + + PREV_GRAY_FRAME = current_gray_frame.copy() # Update for the next frame return temp_frame +def _process_image_target_v2(source_frame_full: Frame, temp_frame: Frame) -> Frame: + if modules.globals.many_faces: + source_face_obj = default_source_face() + if source_face_obj: + for map_item in modules.globals.source_target_map: + target_face = map_item["target"]["face"] + temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + else: # not many_faces + for map_item in modules.globals.source_target_map: + if "source" in map_item: + source_face_obj = map_item["source"]["face"] + target_face = map_item["target"]["face"] + temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + return temp_frame + +def _process_video_target_v2(source_frame_full: Frame, temp_frame: Frame, temp_frame_path: str) -> Frame: + if modules.globals.many_faces: + source_face_obj = default_source_face() + if source_face_obj: + for map_item in modules.globals.source_target_map: + target_frames_data = [f for f in map_item.get("target_faces_in_frame", []) if f.get("location") == temp_frame_path] + for frame_data in target_frames_data: + for target_face in frame_data.get("faces", []): + temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + else: # not many_faces + for map_item in modules.globals.source_target_map: + if "source" in map_item: + source_face_obj = map_item["source"]["face"] + target_frames_data = [f for f in map_item.get("target_faces_in_frame", []) if f.get("location") == temp_frame_path] + for frame_data in target_frames_data: + for target_face in frame_data.get("faces", []): + temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + return temp_frame + +def _process_live_target_v2(source_frame_full: Frame, temp_frame: Frame) -> Frame: + # This function is called by UI directly for webcam when map_faces is True. + # It now uses the same Nth frame + tracking logic as process_frame for its single-face path. + global TARGET_TRACKER, LAST_TARGET_KPS, LAST_TARGET_BBOX_XYWH + global TRACKING_FRAME_COUNTER, DETECTION_INTERVAL, LAST_DETECTION_SUCCESS, PREV_GRAY_FRAME + + current_gray_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2GRAY) # Needed for optical flow + + if modules.globals.many_faces: + # For many_faces in map_faces=True live mode, use existing logic (detect all, swap all with default source) + # This part does not use the new tracking logic. + TRACKING_FRAME_COUNTER += 1 # Still increment for consistency, though not strictly for Nth frame here + if TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0: # Optional: Nth frame for many_faces too + detected_faces = get_many_faces(temp_frame) + if detected_faces: + source_face_obj = default_source_face() + if source_face_obj: + for target_face in detected_faces: + temp_frame = swap_face(source_face_obj, target_face, source_frame_full, temp_frame) + # On non-detection frames for many_faces, no swap occurs unless we cache all detected faces, which is complex. + else: # Not many_faces (single face logic with tracking or simple_map) + TRACKING_FRAME_COUNTER += 1 + target_face_to_swap = None + + if TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0 or not LAST_DETECTION_SUCCESS: + logging.debug(f"Frame {TRACKING_FRAME_COUNTER} (Live V2): Running full detection.") + detected_faces = get_many_faces(temp_frame) # Get all faces + actual_target_face_data = None + + if detected_faces: + if modules.globals.simple_map and modules.globals.simple_map.get("target_embeddings") and modules.globals.simple_map["target_embeddings"][0] is not None: + # Try to find the "main" target face from simple_map's first entry + # This assumes the first simple_map entry is the one to track. + try: + closest_idx, _ = find_closest_centroid([face.normed_embedding for face in detected_faces], modules.globals.simple_map["target_embeddings"][0]) + if closest_idx < len(detected_faces): + actual_target_face_data = detected_faces[closest_idx] + except Exception as e_centroid: # Broad exception for safety with list indexing + logging.warning(f"Error finding closest centroid for simple_map in live_v2: {e_centroid}") + actual_target_face_data = detected_faces[0] # Fallback + else: # Fallback if no simple_map or if logic above fails + actual_target_face_data = detected_faces[0] + + if actual_target_face_data: + target_face_to_swap = actual_target_face_data + if actual_target_face_data.kps is not None: + LAST_TARGET_KPS = actual_target_face_data.kps.copy() + else: + LAST_TARGET_KPS = None + bbox_xyxy = actual_target_face_data.bbox + LAST_TARGET_BBOX_XYWH = [int(bbox_xyxy[0]), int(bbox_xyxy[1]), int(bbox_xyxy[2] - bbox_xyxy[0]), int(bbox_xyxy[3] - bbox_xyxy[1])] + try: + TARGET_TRACKER = cv2.TrackerKCF_create() + TARGET_TRACKER.init(temp_frame, tuple(LAST_TARGET_BBOX_XYWH)) + LAST_DETECTION_SUCCESS = True + except Exception as e: + logging.error(f"Failed to initialize tracker (Live V2): {e}", exc_info=True) + TARGET_TRACKER = None; LAST_DETECTION_SUCCESS = False + else: + LAST_DETECTION_SUCCESS = False; TARGET_TRACKER = None + else: # Intermediate frame tracking + if TARGET_TRACKER is not None and PREV_GRAY_FRAME is not None and LAST_TARGET_KPS is not None: + success_tracker, new_bbox_xywh_float = TARGET_TRACKER.update(temp_frame) + if success_tracker: + new_bbox_xywh = [int(v) for v in new_bbox_xywh_float] + lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)) + tracked_kps_float32 = LAST_TARGET_KPS.astype(np.float32) + new_kps_tracked, opt_flow_status, _ = cv2.calcOpticalFlowPyrLK(PREV_GRAY_FRAME, current_gray_frame, tracked_kps_float32, None, **lk_params) + + if new_kps_tracked is not None and opt_flow_status is not None: + good_new_kps = new_kps_tracked[opt_flow_status.ravel() == 1] + if len(good_new_kps) >= 3: + current_kps = good_new_kps + new_bbox_xyxy_np = np.array([new_bbox_xywh[0], new_bbox_xywh[1], new_bbox_xywh[0] + new_bbox_xywh[2], new_bbox_xywh[1] + new_bbox_xywh[3]], dtype=np.float32) + target_face_to_swap = Face(bbox=new_bbox_xyxy_np, kps=current_kps.astype(np.float32), det_score=0.90, landmark_3d_68=None, landmark_2d_106=None, gender=None, age=None, embedding=None, normed_embedding=None) + LAST_TARGET_KPS = current_kps.copy() + LAST_TARGET_BBOX_XYWH = new_bbox_xywh + LAST_DETECTION_SUCCESS = True + else: # Optical flow lost points + LAST_DETECTION_SUCCESS = False; TARGET_TRACKER = None + else: # Optical flow failed + LAST_DETECTION_SUCCESS = False; TARGET_TRACKER = None + else: # KCF Tracker failed + LAST_DETECTION_SUCCESS = False; TARGET_TRACKER = None + + # Perform swap using the determined target_face_to_swap + if target_face_to_swap: + # Determine source face based on simple_map (if available and target_face_to_swap has embedding for matching) + # This part requires target_face_to_swap to have 'normed_embedding' if we want to use simple_map matching. + # Tracked faces currently don't have embedding. So, this will likely use default_source_face. + source_face_obj_to_use = None + if modules.globals.simple_map and modules.globals.simple_map.get("target_embeddings") and hasattr(target_face_to_swap, 'normed_embedding') and target_face_to_swap.normed_embedding is not None: + closest_centroid_index, _ = find_closest_centroid(modules.globals.simple_map["target_embeddings"], target_face_to_swap.normed_embedding) + if closest_centroid_index < len(modules.globals.simple_map["source_faces"]): + source_face_obj_to_use = modules.globals.simple_map["source_faces"][closest_centroid_index] + + if source_face_obj_to_use is None: # Fallback if no match or no embedding + source_face_obj_to_use = default_source_face() + + if source_face_obj_to_use: + temp_frame = swap_face(source_face_obj_to_use, target_face_to_swap, source_frame_full, temp_frame) + else: + logging.warning("No source face available for tracked/detected target in _process_live_target_v2 (single).") + elif TRACKING_FRAME_COUNTER % DETECTION_INTERVAL == 0 and not LAST_DETECTION_SUCCESS: + logging.info("Target face not found in _process_live_target_v2 (single face path).") + + PREV_GRAY_FRAME = current_gray_frame.copy() + return temp_frame + + +def process_frame_v2(source_frame_full: Frame, temp_frame: Frame, temp_frame_path: str = "") -> Frame: + if is_image(modules.globals.target_path): + return _process_image_target_v2(source_frame_full, temp_frame) + elif is_video(modules.globals.target_path): + # For video files with map_faces=True, use the original _process_video_target_v2 + # as tracking state management across distinct mapped faces is complex and not yet implemented. + # The Nth frame + tracking is primarily for single face mode or live mode. + return _process_video_target_v2(source_frame_full, temp_frame, temp_frame_path) # Original logic without tracking + else: # This is the live cam / generic case (map_faces=True) + return _process_live_target_v2(source_frame_full, temp_frame) + + def process_frames( source_path: str, temp_frame_paths: List[str], progress: Any = None ) -> None: + source_img = cv2.imread(source_path) + if source_img is None: + logging.error(f"Failed to read source image from {source_path}") + return + + if not is_video(modules.globals.target_path): # Reset only if not a video (video handles it in process_video) + reset_tracker_state() + if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) + source_face_obj = get_one_face(source_img) + if not source_face_obj: + logging.error(f"No face detected in source image {source_path}") + return for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) + if temp_frame is None: + logging.warning(f"Failed to read temp_frame from {temp_frame_path}, skipping.") + continue try: - result = process_frame(source_face, temp_frame) + result = process_frame(source_face_obj, source_img, temp_frame) cv2.imwrite(temp_frame_path, result) except Exception as exception: - print(exception) + logging.error(f"Error processing frame {temp_frame_path}: {exception}", exc_info=True) pass if progress: progress.update(1) else: for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) + if temp_frame is None: + logging.warning(f"Failed to read temp_frame from {temp_frame_path}, skipping.") + continue try: - result = process_frame_v2(temp_frame, temp_frame_path) + result = process_frame_v2(source_img, temp_frame, temp_frame_path) cv2.imwrite(temp_frame_path, result) except Exception as exception: - print(exception) + logging.error(f"Error processing frame {temp_frame_path} with map_faces: {exception}", exc_info=True) pass if progress: progress.update(1) -def process_image(source_path: str, target_path: str, output_path: str) -> None: +def process_image(source_path: str, target_path: str, output_path: str, status_fn_callback: Callable[[str, str], None]) -> None: + source_img = cv2.imread(source_path) + if source_img is None: + logging.error(f"Failed to read source image from {source_path}") + return + + original_target_frame = cv2.imread(target_path) + if original_target_frame is None: + logging.error(f"Failed to read original target image from {target_path}") + return + + result = None + + reset_tracker_state() # Ensure fresh state for single image processing + + if not modules.globals.map_faces: - source_face = get_one_face(cv2.imread(source_path)) - target_frame = cv2.imread(target_path) - result = process_frame(source_face, target_frame) - cv2.imwrite(output_path, result) + source_face_obj = get_one_face(source_img) + if not source_face_obj: + logging.error(f"No face detected in source image {source_path}") + return + result = process_frame(source_face_obj, source_img, original_target_frame) else: if modules.globals.many_faces: - update_status( + status_fn_callback( "Many faces enabled. Using first source image. Progressing...", NAME ) - target_frame = cv2.imread(output_path) - result = process_frame_v2(target_frame) + result = process_frame_v2(source_img, original_target_frame, target_path) + + if result is not None: cv2.imwrite(output_path, result) + else: + logging.error(f"Processing image {target_path} failed, result was None.") -def process_video(source_path: str, temp_frame_paths: List[str]) -> None: +def process_video(source_path: str, temp_frame_paths: List[str], status_fn_callback: Callable[[str, str], None]) -> None: + reset_tracker_state() # Ensure fresh state for each video processing + if modules.globals.map_faces and modules.globals.many_faces: - update_status( + status_fn_callback( "Many faces enabled. Using first source image. Progressing...", NAME ) modules.processors.frame.core.process_video( @@ -266,201 +619,153 @@ def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def create_lower_mouth_mask( face: Face, frame: Frame -) -> (np.ndarray, np.ndarray, tuple, np.ndarray): +) -> Tuple[np.ndarray, Optional[np.ndarray], Tuple[int, int, int, int], Optional[np.ndarray]]: mask = np.zeros(frame.shape[:2], dtype=np.uint8) mouth_cutout = None + lower_lip_polygon_details = None # Initialize to ensure it's always defined + + if face.landmark_2d_106 is None: + logging.debug("Skipping lower_mouth_mask due to missing landmark_2d_106 (likely a tracked face).") + return mask, None, (0,0,0,0), None + landmarks = face.landmark_2d_106 - if landmarks is not None: - # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 - lower_lip_order = [ - 65, - 66, - 62, - 70, - 69, - 18, - 19, - 20, - 21, - 22, - 23, - 24, - 0, - 8, - 7, - 6, - 5, - 4, - 3, - 2, - 65, - ] - lower_lip_landmarks = landmarks[lower_lip_order].astype( - np.float32 - ) # Use float for precise calculations - # Calculate the center of the landmarks - center = np.mean(lower_lip_landmarks, axis=0) + lower_lip_order = [ + 65, 66, 62, 70, 69, 18, 19, 20, 21, 22, + 23, 24, 0, 8, 7, 6, 5, 4, 3, 2, 65, + ] + try: + lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32) + except IndexError: + logging.warning("Failed to get lower_lip_landmarks due to landmark indexing issue.") + return mask, None, (0,0,0,0), None - # Expand the landmarks outward - expansion_factor = ( - 1 + modules.globals.mask_down_size - ) # Adjust this for more or less expansion - expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center + center = np.mean(lower_lip_landmarks, axis=0) + expansion_factor = (1 + modules.globals.mask_down_size) + expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center - # Extend the top lip part - toplip_indices = [ - 20, - 0, - 1, - 2, - 3, - 4, - 5, - ] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18 - toplip_extension = ( - modules.globals.mask_size * 0.5 - ) # Adjust this factor to control the extension - for idx in toplip_indices: - direction = expanded_landmarks[idx] - center - direction = direction / np.linalg.norm(direction) - expanded_landmarks[idx] += direction * toplip_extension + toplip_indices = [20, 0, 1, 2, 3, 4, 5] + toplip_extension = (modules.globals.mask_size * 0.5) + for idx in toplip_indices: + direction = expanded_landmarks[idx] - center + norm_direction = np.linalg.norm(direction) + if norm_direction == 0: continue + expanded_landmarks[idx] += (direction / norm_direction) * toplip_extension - # Extend the bottom part (chin area) - chin_indices = [ - 11, - 12, - 13, - 14, - 15, - 16, - ] # Indices for landmarks 21, 22, 23, 24, 0, 8 - chin_extension = 2 * 0.2 # Adjust this factor to control the extension - for idx in chin_indices: - expanded_landmarks[idx][1] += ( - expanded_landmarks[idx][1] - center[1] - ) * chin_extension + chin_indices = [11, 12, 13, 14, 15, 16] + chin_extension = 2 * 0.2 + for idx in chin_indices: + expanded_landmarks[idx][1] += (expanded_landmarks[idx][1] - center[1]) * chin_extension - # Convert back to integer coordinates - expanded_landmarks = expanded_landmarks.astype(np.int32) + expanded_landmarks = expanded_landmarks.astype(np.int32) - # Calculate bounding box for the expanded lower mouth - min_x, min_y = np.min(expanded_landmarks, axis=0) - max_x, max_y = np.max(expanded_landmarks, axis=0) + min_x, min_y = np.min(expanded_landmarks, axis=0) + max_x, max_y = np.max(expanded_landmarks, axis=0) - # Add some padding to the bounding box - padding = int((max_x - min_x) * 0.1) # 10% padding - min_x = max(0, min_x - padding) - min_y = max(0, min_y - padding) - max_x = min(frame.shape[1], max_x + padding) - max_y = min(frame.shape[0], max_y + padding) + padding = int((max_x - min_x) * 0.1) + min_x = max(0, min_x - padding) + min_y = max(0, min_y - padding) + max_x = min(frame.shape[1] - 1, max_x + padding) # Ensure max_x is within bounds + max_y = min(frame.shape[0] - 1, max_y + padding) # Ensure max_y is within bounds - # Ensure the bounding box dimensions are valid - if max_x <= min_x or max_y <= min_y: - if (max_x - min_x) <= 1: - max_x = min_x + 1 - if (max_y - min_y) <= 1: - max_y = min_y + 1 + # Ensure min is less than max after adjustments + if max_x <= min_x: max_x = min_x + 1 + if max_y <= min_y: max_y = min_y + 1 - # Create the mask - mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) - cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255) + # Ensure ROI dimensions are positive + if max_y - min_y <= 0 or max_x - min_x <= 0: + logging.warning(f"Invalid ROI for mouth mask creation: min_x={min_x}, max_x={max_x}, min_y={min_y}, max_y={max_y}") + return mask, None, (min_x, min_y, max_x, max_y), None # Return current min/max for bbox - # Apply Gaussian blur to soften the mask edges - mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5) + mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8) + # Adjust landmarks to be relative to the ROI + adjusted_landmarks = expanded_landmarks - [min_x, min_y] + cv2.fillPoly(mask_roi, [adjusted_landmarks], 255) - # Place the mask ROI in the full-sized mask - mask[min_y:max_y, min_x:max_x] = mask_roi + # Apply Gaussian blur to soften the mask edges + # Ensure kernel size is odd and positive + blur_kernel_size = (15, 15) # Make sure this is appropriate + if blur_kernel_size[0] % 2 == 0: blur_kernel_size = (blur_kernel_size[0]+1, blur_kernel_size[1]) + if blur_kernel_size[1] % 2 == 0: blur_kernel_size = (blur_kernel_size[0], blur_kernel_size[1]+1) + if blur_kernel_size[0] <=0 : blur_kernel_size = (1, blur_kernel_size[1]) + if blur_kernel_size[1] <=0 : blur_kernel_size = (blur_kernel_size[0], 1) - # Extract the masked area from the frame - mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() + mask_roi = cv2.GaussianBlur(mask_roi, blur_kernel_size, 5) # Sigma might also need tuning - # Return the expanded lower lip polygon in original frame coordinates - lower_lip_polygon = expanded_landmarks + mask[min_y:max_y, min_x:max_x] = mask_roi + mouth_cutout = frame[min_y:max_y, min_x:max_x].copy() + lower_lip_polygon_details = expanded_landmarks - return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon + return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon_details def draw_mouth_mask_visualization( frame: Frame, face: Face, mouth_mask_data: tuple ) -> Frame: - landmarks = face.landmark_2d_106 - if landmarks is not None and mouth_mask_data is not None: - mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = ( - mouth_mask_data - ) + if face.landmark_2d_106 is None or mouth_mask_data is None or mouth_mask_data[1] is None: + logging.debug("Skipping mouth mask visualization due to missing landmarks or data.") + return frame - vis_frame = frame.copy() + mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = mouth_mask_data + if mouth_cutout is None or lower_lip_polygon is None: + logging.debug("Skipping mouth mask visualization due to missing mouth_cutout or polygon.") + return frame - # Ensure coordinates are within frame bounds - height, width = vis_frame.shape[:2] - min_x, min_y = max(0, min_x), max(0, min_y) - max_x, max_y = min(width, max_x), min(height, max_y) - - # Adjust mask to match the region size - mask_region = mask[0 : max_y - min_y, 0 : max_x - min_x] - - # Remove the color mask overlay - # color_mask = cv2.applyColorMap((mask_region * 255).astype(np.uint8), cv2.COLORMAP_JET) - - # Ensure shapes match before blending - vis_region = vis_frame[min_y:max_y, min_x:max_x] - # Remove blending with color_mask - # if vis_region.shape[:2] == color_mask.shape[:2]: - # blended = cv2.addWeighted(vis_region, 0.7, color_mask, 0.3, 0) - # vis_frame[min_y:max_y, min_x:max_x] = blended - - # Draw the lower lip polygon - cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2) - - # Remove the red box - # cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2) - - # Visualize the feathered mask - feather_amount = max( - 1, - min( - 30, - (max_x - min_x) // modules.globals.mask_feather_ratio, - (max_y - min_y) // modules.globals.mask_feather_ratio, - ), - ) - # Ensure kernel size is odd - kernel_size = 2 * feather_amount + 1 - feathered_mask = cv2.GaussianBlur( - mask_region.astype(float), (kernel_size, kernel_size), 0 - ) - feathered_mask = (feathered_mask / feathered_mask.max() * 255).astype(np.uint8) - # Remove the feathered mask color overlay - # color_feathered_mask = cv2.applyColorMap(feathered_mask, cv2.COLORMAP_VIRIDIS) - - # Ensure shapes match before blending feathered mask - # if vis_region.shape == color_feathered_mask.shape: - # blended_feathered = cv2.addWeighted(vis_region, 0.7, color_feathered_mask, 0.3, 0) - # vis_frame[min_y:max_y, min_x:max_x] = blended_feathered - - # Add labels - cv2.putText( - vis_frame, - "Lower Mouth Mask", - (min_x, min_y - 10), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5, - (255, 255, 255), - 1, - ) - cv2.putText( - vis_frame, - "Feathered Mask", - (min_x, max_y + 20), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5, - (255, 255, 255), - 1, - ) + vis_frame = frame.copy() + height, width = vis_frame.shape[:2] + min_x, min_y = max(0, min_x), max(0, min_y) + max_x, max_y = min(width, max_x), min(height, max_y) + if max_y - min_y <= 0 or max_x - min_x <= 0: + logging.warning("Invalid ROI for mouth mask visualization.") return vis_frame - return frame + mask_region = mask[0 : max_y - min_y, 0 : max_x - min_x] # This line might be problematic if mask is full frame + + cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2) # This uses original lower_lip_polygon coordinates + + # For displaying the mask itself, it's better to show the ROI where it was applied + # or create a version of the mask that is full frame for visualization. + # The current `mask_region` is a crop of the full `mask`. + # Let's ensure we are visualizing the correct part or the full mask. + # If `mask` is the full-frame mask, and `mask_region` was just for feathering calculation, + # then we should use `mask` for display or a ROI from `mask`. + + # To make vis_frame part where mask is applied red (for example): + # vis_frame_roi = vis_frame[min_y:max_y, min_x:max_x] + # boolean_mask_roi = mask[min_y:max_y, min_x:max_x] > 127 # Assuming mask is full frame + # if vis_frame_roi.shape[:2] == boolean_mask_roi.shape: + # vis_frame_roi[boolean_mask_roi] = [0,0,255] # Red where mask is active + + # The existing feathering logic for visualization: + feather_amount = max(1, min(30, + (max_x - min_x) // modules.globals.mask_feather_ratio if (max_x - min_x) > 0 and modules.globals.mask_feather_ratio > 0 else 1, + (max_y - min_y) // modules.globals.mask_feather_ratio if (max_y - min_y) > 0 and modules.globals.mask_feather_ratio > 0 else 1 + )) + kernel_size = 2 * feather_amount + 1 + + # Assuming mask_region was correctly extracted for visualization purposes (e.g., a crop of the mask) + # If mask_region is intended to be the mask that was applied, its size should match the ROI. + if mask_region.size > 0 and mask_region.shape[0] == (max_y-min_y) and mask_region.shape[1] == (max_x-min_x): + feathered_mask_vis = cv2.GaussianBlur(mask_region.astype(float), (kernel_size, kernel_size), 0) + max_val = feathered_mask_vis.max() + if max_val > 0: feathered_mask_vis = (feathered_mask_vis / max_val * 255).astype(np.uint8) + else: feathered_mask_vis = np.zeros_like(mask_region, dtype=np.uint8) + + # Create a 3-channel version of the feathered mask for overlay if desired + # feathered_mask_vis_3ch = cv2.cvtColor(feathered_mask_vis, cv2.COLOR_GRAY2BGR) + # vis_frame_roi = vis_frame[min_y:max_y, min_x:max_x] + # blended_roi = cv2.addWeighted(vis_frame_roi, 0.7, feathered_mask_vis_3ch, 0.3, 0) + # vis_frame[min_y:max_y, min_x:max_x] = blended_roi + else: + # If mask_region is not what we expect, log or handle. + # For now, we'll skip drawing the feathered_mask part if dimensions mismatch. + logging.debug("Skipping feathered mask visualization part due to mask_region issues.") + + + cv2.putText(vis_frame, "Lower Mouth Mask (Polygon)", (min_x, min_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + # cv2.putText(vis_frame, "Feathered Mask (Visualization)", (min_x, max_y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # Optional text + + return vis_frame def apply_mouth_area( @@ -470,63 +775,72 @@ def apply_mouth_area( face_mask: np.ndarray, mouth_polygon: np.ndarray, ) -> np.ndarray: + if mouth_polygon is None or mouth_cutout is None: + logging.debug("Skipping apply_mouth_area due to missing mouth_polygon or mouth_cutout.") + return frame + min_x, min_y, max_x, max_y = mouth_box box_width = max_x - min_x box_height = max_y - min_y - if ( - mouth_cutout is None - or box_width is None - or box_height is None - or face_mask is None - or mouth_polygon is None - ): + if box_width <= 0 or box_height <= 0 or face_mask is None: + logging.debug(f"Skipping apply_mouth_area due to invalid box dimensions or missing face_mask. W:{box_width} H:{box_height}") return frame try: - resized_mouth_cutout = cv2.resize(mouth_cutout, (box_width, box_height)) + # Ensure ROI is valid before attempting to access frame data + if min_y >= max_y or min_x >= max_x: + logging.warning(f"Invalid ROI for applying mouth area: min_x={min_x}, max_x={max_x}, min_y={min_y}, max_y={max_y}") + return frame + roi = frame[min_y:max_y, min_x:max_x] - if roi.shape != resized_mouth_cutout.shape: - resized_mouth_cutout = cv2.resize( - resized_mouth_cutout, (roi.shape[1], roi.shape[0]) - ) + # Resize mouth_cutout to match the ROI dimensions if they differ + if roi.shape[:2] != mouth_cutout.shape[:2]: + resized_mouth_cutout = cv2.resize(mouth_cutout, (roi.shape[1], roi.shape[0])) + else: + resized_mouth_cutout = mouth_cutout color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi) - # Use the provided mouth polygon to create the mask + # Create polygon_mask for the ROI polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8) adjusted_polygon = mouth_polygon - [min_x, min_y] - cv2.fillPoly(polygon_mask, [adjusted_polygon], 255) + cv2.fillPoly(polygon_mask, [adjusted_polygon.astype(np.int32)], 255) # Ensure polygon points are int32 - # Apply feathering to the polygon mask - feather_amount = min( - 30, - box_width // modules.globals.mask_feather_ratio, - box_height // modules.globals.mask_feather_ratio, - ) - feathered_mask = cv2.GaussianBlur( - polygon_mask.astype(float), (0, 0), feather_amount - ) - feathered_mask = feathered_mask / feathered_mask.max() + # Calculate feathering based on ROI dimensions + feather_amount = max(1, min(30, + roi.shape[1] // modules.globals.mask_feather_ratio if modules.globals.mask_feather_ratio > 0 else 30, + roi.shape[0] // modules.globals.mask_feather_ratio if modules.globals.mask_feather_ratio > 0 else 30 + )) + kernel_size_blur = 2 * feather_amount + 1 # Ensure it's odd + if kernel_size_blur <= 0: kernel_size_blur = 1 # Ensure positive + feathered_mask_float = cv2.GaussianBlur(polygon_mask.astype(float), (kernel_size_blur, kernel_size_blur), 0) + + max_val = feathered_mask_float.max() + feathered_mask_normalized = feathered_mask_float / max_val if max_val > 0 else feathered_mask_float + + # Ensure face_mask_roi matches dimensions of feathered_mask_normalized face_mask_roi = face_mask[min_y:max_y, min_x:max_x] - combined_mask = feathered_mask * (face_mask_roi / 255.0) + if face_mask_roi.shape != feathered_mask_normalized.shape: + face_mask_roi = cv2.resize(face_mask_roi, (feathered_mask_normalized.shape[1], feathered_mask_normalized.shape[0])) + logging.warning("Resized face_mask_roi to match feathered_mask_normalized in apply_mouth_area.") - combined_mask = combined_mask[:, :, np.newaxis] - blended = ( - color_corrected_mouth * combined_mask + roi * (1 - combined_mask) - ).astype(np.uint8) - # Apply face mask to blended result - face_mask_3channel = ( - np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0 + combined_mask_float = feathered_mask_normalized * (face_mask_roi / 255.0) + combined_mask_3ch = combined_mask_float[:, :, np.newaxis] # Ensure broadcasting for 3 channels + + # Ensure all inputs to blending are float32 for precision, then convert back to uint8 + blended_float = ( + color_corrected_mouth.astype(np.float32) * combined_mask_3ch + + roi.astype(np.float32) * (1.0 - combined_mask_3ch) # Ensure 1.0 for float subtraction ) - final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel) + blended = np.clip(blended_float, 0, 255).astype(np.uint8) - frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8) + frame[min_y:max_y, min_x:max_x] = blended except Exception as e: - pass + logging.error(f"Error in apply_mouth_area: {e}", exc_info=True) return frame @@ -534,89 +848,158 @@ def apply_mouth_area( def create_face_mask(face: Face, frame: Frame) -> np.ndarray: mask = np.zeros(frame.shape[:2], dtype=np.uint8) landmarks = face.landmark_2d_106 - if landmarks is not None: - # Convert landmarks to int32 - landmarks = landmarks.astype(np.int32) - # Extract facial features - right_side_face = landmarks[0:16] - left_side_face = landmarks[17:32] - right_eye = landmarks[33:42] - right_eye_brow = landmarks[43:51] - left_eye = landmarks[87:96] - left_eye_brow = landmarks[97:105] + if landmarks is None: + logging.debug("Face landmarks (landmark_2d_106) not available for face mask creation (likely tracked face). Using bbox as fallback.") + if face.bbox is not None: + x1, y1, x2, y2 = face.bbox.astype(int) + # Ensure coordinates are within frame boundaries + fh, fw = frame.shape[:2] + x1, y1 = max(0, x1), max(0, y1) + x2, y2 = min(fw - 1, x2), min(fh - 1, y2) + if x1 < x2 and y1 < y2: + center_x = (x1 + x2) // 2 + center_y = (y1 + y2) // 2 + width = x2 - x1 + height = y2 - y1 + cv2.ellipse(mask, (center_x, center_y), (int(width * 0.6), int(height * 0.7)), 0, 0, 360, 255, -1) + # Ensure kernel size is odd and positive for GaussianBlur + blur_kernel_size_face = (15,15) # Example, can be tuned + if blur_kernel_size_face[0] % 2 == 0: blur_kernel_size_face = (blur_kernel_size_face[0]+1, blur_kernel_size_face[1]) + if blur_kernel_size_face[1] % 2 == 0: blur_kernel_size_face = (blur_kernel_size_face[0], blur_kernel_size_face[1]+1) + if blur_kernel_size_face[0] <=0 : blur_kernel_size_face = (1, blur_kernel_size_face[1]) + if blur_kernel_size_face[1] <=0 : blur_kernel_size_face = (blur_kernel_size_face[0], 1) + mask = cv2.GaussianBlur(mask, blur_kernel_size_face, 5) + return mask - # Calculate forehead extension - right_eyebrow_top = np.min(right_eye_brow[:, 1]) - left_eyebrow_top = np.min(left_eye_brow[:, 1]) - eyebrow_top = min(right_eyebrow_top, left_eyebrow_top) + landmarks = landmarks.astype(np.int32) + right_side_face = landmarks[0:16] + left_side_face = landmarks[17:32] + right_eye_brow = landmarks[43:51] + left_eye_brow = landmarks[97:105] - face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]]) - forehead_height = face_top - eyebrow_top - extended_forehead_height = int(forehead_height * 5.0) # Extend by 50% + if right_eye_brow.size == 0 or left_eye_brow.size == 0 or right_side_face.size == 0 or left_side_face.size == 0 : + logging.warning("Face mask creation skipped due to empty landmark arrays for key features.") + if face.bbox is not None: + x1, y1, x2, y2 = face.bbox.astype(int) + cv2.rectangle(mask, (x1,y1), (x2,y2), 255, -1) + # Ensure kernel size is odd and positive for GaussianBlur + blur_kernel_size_face_fallback = (15,15) + if blur_kernel_size_face_fallback[0] % 2 == 0: blur_kernel_size_face_fallback = (blur_kernel_size_face_fallback[0]+1, blur_kernel_size_face_fallback[1]) + if blur_kernel_size_face_fallback[1] % 2 == 0: blur_kernel_size_face_fallback = (blur_kernel_size_face_fallback[0], blur_kernel_size_face_fallback[1]+1) + if blur_kernel_size_face_fallback[0] <=0 : blur_kernel_size_face_fallback = (1, blur_kernel_size_face_fallback[1]) + if blur_kernel_size_face_fallback[1] <=0 : blur_kernel_size_face_fallback = (blur_kernel_size_face_fallback[0], 1) + mask = cv2.GaussianBlur(mask, blur_kernel_size_face_fallback, 5) + return mask - # Create forehead points - forehead_left = right_side_face[0].copy() - forehead_right = left_side_face[-1].copy() - forehead_left[1] -= extended_forehead_height - forehead_right[1] -= extended_forehead_height + right_eyebrow_top = np.min(right_eye_brow[:, 1]) + left_eyebrow_top = np.min(left_eye_brow[:, 1]) + eyebrow_top = min(right_eyebrow_top, left_eyebrow_top) - # Combine all points to create the face outline - face_outline = np.vstack( - [ - [forehead_left], - right_side_face, - left_side_face[ - ::-1 - ], # Reverse left side to create a continuous outline - [forehead_right], - ] - ) + face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]]) + forehead_height = max(0, face_top - eyebrow_top) + extended_forehead_height = int(forehead_height * 5.0) - # Calculate padding - padding = int( - np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05 - ) # 5% of face width + forehead_left = right_side_face[0].copy() + forehead_right = left_side_face[-1].copy() - # Create a slightly larger convex hull for padding - hull = cv2.convexHull(face_outline) - hull_padded = [] - for point in hull: - x, y = point[0] - center = np.mean(face_outline, axis=0) - direction = np.array([x, y]) - center - direction = direction / np.linalg.norm(direction) - padded_point = np.array([x, y]) + direction * padding - hull_padded.append(padded_point) + forehead_left[1] = max(0, forehead_left[1] - extended_forehead_height) + forehead_right[1] = max(0, forehead_right[1] - extended_forehead_height) - hull_padded = np.array(hull_padded, dtype=np.int32) + face_outline = np.vstack( + [ + [forehead_left], right_side_face, left_side_face[::-1], [forehead_right], + ] + ) - # Fill the padded convex hull - cv2.fillConvexPoly(mask, hull_padded, 255) + if face_outline.shape[0] < 3 : + logging.warning("Not enough points for convex hull in face mask creation. Using bbox as fallback.") + if face.bbox is not None: + x1, y1, x2, y2 = face.bbox.astype(int) + cv2.rectangle(mask, (x1,y1), (x2,y2), 255, -1) + # Ensure kernel size is odd and positive for GaussianBlur + blur_kernel_size_face_hull_fallback = (15,15) + if blur_kernel_size_face_hull_fallback[0] % 2 == 0: blur_kernel_size_face_hull_fallback = (blur_kernel_size_face_hull_fallback[0]+1, blur_kernel_size_face_hull_fallback[1]) + if blur_kernel_size_face_hull_fallback[1] % 2 == 0: blur_kernel_size_face_hull_fallback = (blur_kernel_size_face_hull_fallback[0], blur_kernel_size_face_hull_fallback[1]+1) + if blur_kernel_size_face_hull_fallback[0] <=0 : blur_kernel_size_face_hull_fallback = (1, blur_kernel_size_face_hull_fallback[1]) + if blur_kernel_size_face_hull_fallback[1] <=0 : blur_kernel_size_face_hull_fallback = (blur_kernel_size_face_hull_fallback[0], 1) + mask = cv2.GaussianBlur(mask, blur_kernel_size_face_hull_fallback, 5) + return mask - # Smooth the mask edges - mask = cv2.GaussianBlur(mask, (5, 5), 3) + padding = int(np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05) + hull = cv2.convexHull(face_outline) + hull_padded = [] + center_of_outline = np.mean(face_outline, axis=0).squeeze() + if center_of_outline.ndim > 1: + center_of_outline = np.mean(center_of_outline, axis=0) # Ensure center_of_outline is 1D + + for point_contour in hull: + point = point_contour[0] + direction = point - center_of_outline + norm_direction = np.linalg.norm(direction) + if norm_direction == 0: unit_direction = np.array([0,0], dtype=float) # Ensure float for multiplication + else: unit_direction = direction / norm_direction + + padded_point = point + unit_direction * padding + hull_padded.append(padded_point) + + if hull_padded: + hull_padded_np = np.array(hull_padded, dtype=np.int32) + # cv2.fillConvexPoly expects a 2D array for points, or 3D with shape (N,1,2) + if hull_padded_np.ndim == 3 and hull_padded_np.shape[1] == 1: # Already (N,1,2) + cv2.fillConvexPoly(mask, hull_padded_np, 255) + elif hull_padded_np.ndim == 2: # Shape (N,2) + cv2.fillConvexPoly(mask, hull_padded_np[:, np.newaxis, :], 255) # Reshape to (N,1,2) + else: # Fallback if shape is unexpected + logging.warning("Unexpected shape for hull_padded in create_face_mask. Using raw hull.") + if hull.ndim == 2: hull = hull[:,np.newaxis,:] # Ensure hull is (N,1,2) + cv2.fillConvexPoly(mask, hull, 255) + else: + # Fallback to raw hull if hull_padded is empty for some reason + if hull.ndim == 2: hull = hull[:,np.newaxis,:] # Ensure hull is (N,1,2) + cv2.fillConvexPoly(mask, hull, 255) + + # Ensure kernel size is odd and positive for GaussianBlur + blur_kernel_size_face_final = (5,5) + if blur_kernel_size_face_final[0] % 2 == 0: blur_kernel_size_face_final = (blur_kernel_size_face_final[0]+1, blur_kernel_size_face_final[1]) + if blur_kernel_size_face_final[1] % 2 == 0: blur_kernel_size_face_final = (blur_kernel_size_face_final[0], blur_kernel_size_face_final[1]+1) + if blur_kernel_size_face_final[0] <=0 : blur_kernel_size_face_final = (1, blur_kernel_size_face_final[1]) + if blur_kernel_size_face_final[1] <=0 : blur_kernel_size_face_final = (blur_kernel_size_face_final[0], 1) + mask = cv2.GaussianBlur(mask, blur_kernel_size_face_final, 3) return mask def apply_color_transfer(source, target): - """ - Apply color transfer from target to source image - """ - source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") - target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") + # Ensure inputs are not empty + if source is None or source.size == 0 or target is None or target.size == 0: + logging.warning("Color transfer skipped due to empty source or target image.") + return source # Or target, depending on desired behavior for empty inputs - source_mean, source_std = cv2.meanStdDev(source) - target_mean, target_std = cv2.meanStdDev(target) + try: + source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32") + target_lab = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32") - # Reshape mean and std to be broadcastable - source_mean = source_mean.reshape(1, 1, 3) - source_std = source_std.reshape(1, 1, 3) - target_mean = target_mean.reshape(1, 1, 3) - target_std = target_std.reshape(1, 1, 3) + source_mean, source_std = cv2.meanStdDev(source_lab) + target_mean, target_std = cv2.meanStdDev(target_lab) - # Perform the color transfer - source = (source - source_mean) * (target_std / source_std) + target_mean + source_mean = source_mean.reshape((1, 1, 3)) + source_std = source_std.reshape((1, 1, 3)) + target_mean = target_mean.reshape((1, 1, 3)) + target_std = target_std.reshape((1, 1, 3)) - return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR) + # Avoid division by zero if source_std is zero + source_std[source_std == 0] = 1e-6 # A small epsilon instead of 1 to avoid large scaling if target_std is also small + + adjusted_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean + adjusted_lab = np.clip(adjusted_lab, 0, 255) # Clip values to be within valid range for LAB + + result_bgr = cv2.cvtColor(adjusted_lab.astype("uint8"), cv2.COLOR_LAB2BGR) + except cv2.error as e: + logging.error(f"OpenCV error in apply_color_transfer: {e}", exc_info=True) + return source # Return original source on error + except Exception as e: + logging.error(f"Unexpected error in apply_color_transfer: {e}", exc_info=True) + return source # Return original source on error + + return result_bgr diff --git a/modules/ui.py b/modules/ui.py index ce599d6..3ed737a 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -19,6 +19,7 @@ from modules.face_analyser import ( ) from modules.capturer import get_video_frame, get_video_frame_total from modules.processors.frame.core import get_frame_processors_modules +from modules.processors.frame.face_swapper import reset_tracker_state # Added import from modules.utilities import ( is_image, is_video, @@ -105,6 +106,7 @@ def save_switch_states(): "show_fps": modules.globals.show_fps, "mouth_mask": modules.globals.mouth_mask, "show_mouth_mask_box": modules.globals.show_mouth_mask_box, + "enable_hair_swapping": modules.globals.enable_hair_swapping, } with open("switch_states.json", "w") as f: json.dump(switch_states, f) @@ -129,6 +131,9 @@ def load_switch_states(): modules.globals.show_mouth_mask_box = switch_states.get( "show_mouth_mask_box", False ) + modules.globals.enable_hair_swapping = switch_states.get( + "enable_hair_swapping", True # Default to True if not found + ) except FileNotFoundError: # If the file doesn't exist, use default values pass @@ -236,6 +241,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C command=lambda: ( setattr(modules.globals, "many_faces", many_faces_value.get()), save_switch_states(), + reset_tracker_state() # Added reset call ), ) many_faces_switch.place(relx=0.6, rely=0.65) @@ -253,10 +259,6 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C ) color_correction_switch.place(relx=0.6, rely=0.70) - # nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw_filter) - # nsfw_switch = ctk.CTkSwitch(root, text='NSFW filter', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw_filter', nsfw_value.get())) - # nsfw_switch.place(relx=0.6, rely=0.7) - map_faces = ctk.BooleanVar(value=modules.globals.map_faces) map_faces_switch = ctk.CTkSwitch( root, @@ -266,7 +268,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C command=lambda: ( setattr(modules.globals, "map_faces", map_faces.get()), save_switch_states(), - close_mapper_window() if not map_faces.get() else None + close_mapper_window() if not map_faces.get() else None, + reset_tracker_state() # Added reset call ), ) map_faces_switch.place(relx=0.1, rely=0.75) @@ -284,6 +287,19 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C ) show_fps_switch.place(relx=0.6, rely=0.75) + hair_swapping_value = ctk.BooleanVar(value=modules.globals.enable_hair_swapping) + hair_swapping_switch = ctk.CTkSwitch( + root, + text=_("Swap Hair"), + variable=hair_swapping_value, + cursor="hand2", + command=lambda: ( + setattr(modules.globals, "enable_hair_swapping", hair_swapping_value.get()), + save_switch_states(), + ) + ) + hair_swapping_switch.place(relx=0.6, rely=0.80) + mouth_mask_var = ctk.BooleanVar(value=modules.globals.mouth_mask) mouth_mask_switch = ctk.CTkSwitch( root, @@ -309,21 +325,20 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C start_button = ctk.CTkButton( root, text=_("Start"), cursor="hand2", command=lambda: analyze_target(start, root) ) - start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05) + start_button.place(relx=0.15, rely=0.85, relwidth=0.2, relheight=0.05) stop_button = ctk.CTkButton( root, text=_("Destroy"), cursor="hand2", command=lambda: destroy() ) - stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05) + stop_button.place(relx=0.4, rely=0.85, relwidth=0.2, relheight=0.05) preview_button = ctk.CTkButton( root, text=_("Preview"), cursor="hand2", command=lambda: toggle_preview() ) - preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05) + preview_button.place(relx=0.65, rely=0.85, relwidth=0.2, relheight=0.05) - # --- Camera Selection --- camera_label = ctk.CTkLabel(root, text=_("Select Camera:")) - camera_label.place(relx=0.1, rely=0.86, relwidth=0.2, relheight=0.05) + camera_label.place(relx=0.1, rely=0.91, relwidth=0.2, relheight=0.05) available_cameras = get_available_cameras() camera_indices, camera_names = available_cameras @@ -342,7 +357,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C root, variable=camera_variable, values=camera_names ) - camera_optionmenu.place(relx=0.35, rely=0.86, relwidth=0.25, relheight=0.05) + camera_optionmenu.place(relx=0.35, rely=0.91, relwidth=0.25, relheight=0.05) live_button = ctk.CTkButton( root, @@ -362,16 +377,15 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C else "disabled" ), ) - live_button.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) - # --- End Camera Selection --- + live_button.place(relx=0.65, rely=0.91, relwidth=0.2, relheight=0.05) status_label = ctk.CTkLabel(root, text=None, justify="center") - status_label.place(relx=0.1, rely=0.9, relwidth=0.8) + status_label.place(relx=0.1, rely=0.96, relwidth=0.8) donate_label = ctk.CTkLabel( root, text="Deep Live Cam", justify="center", cursor="hand2" ) - donate_label.place(relx=0.1, rely=0.95, relwidth=0.8) + donate_label.place(relx=0.1, rely=0.99, relwidth=0.8) donate_label.configure( text_color=ctk.ThemeManager.theme.get("URL").get("text_color") ) @@ -593,9 +607,11 @@ def select_source_path() -> None: RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path) image = render_image_preview(modules.globals.source_path, (200, 200)) source_label.configure(image=image) + reset_tracker_state() # Added reset call else: modules.globals.source_path = None source_label.configure(image=None) + reset_tracker_state() # Added reset call even if source is cleared def swap_faces_paths() -> None: @@ -880,12 +896,96 @@ def create_webcam_preview(camera_index: int): PREVIEW.deiconify() frame_processors = get_frame_processors_modules(modules.globals.frame_processors) - source_image = None + + # --- Source Image Loading and Validation (Moved before the loop) --- + source_face_obj_for_cam = None + source_frame_full_for_cam = None + source_frame_full_for_cam_map_faces = None + + if not modules.globals.map_faces: + if not modules.globals.source_path: + update_status("Error: No source image selected for webcam mode.") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + if not os.path.exists(modules.globals.source_path): + update_status(f"Error: Source image not found at {modules.globals.source_path}") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + + source_frame_full_for_cam = cv2.imread(modules.globals.source_path) + if source_frame_full_for_cam is None: + update_status(f"Error: Could not read source image at {modules.globals.source_path}") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + + source_face_obj_for_cam = get_one_face(source_frame_full_for_cam) + if source_face_obj_for_cam is None: + update_status(f"Error: No face detected in source image {modules.globals.source_path}") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + else: # modules.globals.map_faces is True + if not modules.globals.source_path: + update_status("Error: No global source image selected (for hair/background in map_faces mode).") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + if not os.path.exists(modules.globals.source_path): + update_status(f"Error: Source image (for hair/background) not found at {modules.globals.source_path}") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + + source_frame_full_for_cam_map_faces = cv2.imread(modules.globals.source_path) + if source_frame_full_for_cam_map_faces is None: + update_status(f"Error: Could not read source image (for hair/background) at {modules.globals.source_path}") + cap.release() + PREVIEW.withdraw() + while PREVIEW.state() != "withdrawn" and ROOT.winfo_exists(): + ROOT.update_idletasks() + ROOT.update() + time.sleep(0.05) + return + + if not modules.globals.source_target_map and not modules.globals.simple_map: + update_status("Warning: No face map defined for map_faces mode. Swapper may not work as expected.") + + # --- End Source Image Loading --- + prev_time = time.time() fps_update_interval = 0.5 frame_count = 0 fps = 0 + reset_tracker_state() # Ensure tracker is reset before starting webcam loop + while True: ret, frame = cap.read() if not ret: @@ -900,32 +1000,32 @@ def create_webcam_preview(camera_index: int): temp_frame = fit_image_to_size( temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) - else: temp_frame = fit_image_to_size( temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) + # REMOVED: detection_frame_counter += 1 + # REMOVED: if detection_frame_counter % DETECTION_INTERVAL == 0: + # The following block is now unindented to run every frame if not modules.globals.map_faces: - if source_image is None and modules.globals.source_path: - source_image = get_one_face(cv2.imread(modules.globals.source_path)) - - for frame_processor in frame_processors: - if frame_processor.NAME == "DLC.FACE-ENHANCER": - if modules.globals.fp_ui["face_enhancer"]: - temp_frame = frame_processor.process_frame(None, temp_frame) - else: - temp_frame = frame_processor.process_frame(source_image, temp_frame) + if source_face_obj_for_cam is not None and source_frame_full_for_cam is not None: + for frame_processor in frame_processors: + if frame_processor.NAME == "DLC.FACE-ENHANCER": + if modules.globals.fp_ui["face_enhancer"]: + temp_frame = frame_processor.process_frame(None, temp_frame) + else: + temp_frame = frame_processor.process_frame(source_face_obj_for_cam, source_frame_full_for_cam, temp_frame) else: - modules.globals.target_path = None - for frame_processor in frame_processors: - if frame_processor.NAME == "DLC.FACE-ENHANCER": - if modules.globals.fp_ui["face_enhancer"]: - temp_frame = frame_processor.process_frame_v2(temp_frame) - else: - temp_frame = frame_processor.process_frame_v2(temp_frame) + if source_frame_full_for_cam_map_faces is not None: + modules.globals.target_path = None + for frame_processor in frame_processors: + if frame_processor.NAME == "DLC.FACE-ENHANCER": + if modules.globals.fp_ui["face_enhancer"]: + temp_frame = frame_processor.process_frame_v2(temp_frame) + else: + temp_frame = frame_processor.process_frame_v2(source_frame_full_for_cam_map_faces, temp_frame) - # Calculate and display FPS current_time = time.time() frame_count += 1 if current_time - prev_time >= fps_update_interval: diff --git a/modules/video_capture.py b/modules/video_capture.py index cab223d..cb101fc 100644 --- a/modules/video_capture.py +++ b/modules/video_capture.py @@ -50,7 +50,16 @@ class VideoCapturer: continue else: # Unix-like systems (Linux/Mac) capture method - self.cap = cv2.VideoCapture(self.device_index) + if platform.system() == "Darwin": # macOS + print(f"INFO: macOS detected. Attempting to use cv2.CAP_AVFOUNDATION exclusively for camera index {self.device_index}.") + self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION) + # The check 'if not self.cap or not self.cap.isOpened():' later in the function + # will now directly reflect the success or failure of AVFoundation. + if not self.cap or not self.cap.isOpened(): + print(f"ERROR: cv2.CAP_AVFOUNDATION failed to open camera index {self.device_index}. Capture will likely fail.") + # No fallback to default cv2.VideoCapture(self.device_index) here for macOS. + else: # Other Unix-like systems (e.g., Linux) + self.cap = cv2.VideoCapture(self.device_index) if not self.cap or not self.cap.isOpened(): raise RuntimeError("Failed to open camera") diff --git a/requirements.txt b/requirements.txt index 6d9f8b8..47f42bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ numpy>=1.23.5,<2 typing-extensions>=4.8.0 -opencv-python==4.10.0.84 +opencv-contrib-python==4.10.0.84 cv2_enumerate_cameras==1.1.15 onnx==1.16.0 insightface==0.7.3 @@ -19,3 +19,4 @@ onnxruntime-gpu==1.17; sys_platform != 'darwin' tensorflow; sys_platform != 'darwin' opennsfw2==0.10.2 protobuf==4.23.2 +transformers>=4.0.0 diff --git a/run-cuda.bat b/run-cuda.bat index 93042a7..a638b99 100644 --- a/run-cuda.bat +++ b/run-cuda.bat @@ -1 +1,16 @@ -python run.py --execution-provider cuda +@echo off +set VENV_DIR=.venv + +:: Check if virtual environment exists +if not exist "%VENV_DIR%\Scripts\activate.bat" ( + echo Virtual environment '%VENV_DIR%' not found. + echo Please run setup_windows.bat first. + pause + exit /b 1 +) + +echo Activating virtual environment... +call "%VENV_DIR%\Scripts\activate.bat" + +echo Starting the application with CUDA execution provider... +python run.py --execution-provider cuda %* diff --git a/run-directml.bat b/run-directml.bat index 038e958..90a4b18 100644 --- a/run-directml.bat +++ b/run-directml.bat @@ -1 +1,16 @@ -python run.py --execution-provider dml +@echo off +set VENV_DIR=.venv + +:: Check if virtual environment exists +if not exist "%VENV_DIR%\Scripts\activate.bat" ( + echo Virtual environment '%VENV_DIR%' not found. + echo Please run setup_windows.bat first. + pause + exit /b 1 +) + +echo Activating virtual environment... +call "%VENV_DIR%\Scripts\activate.bat" + +echo Starting the application with DirectML execution provider... +python run.py --execution-provider dml %* diff --git a/run_mac.sh b/run_mac.sh new file mode 100644 index 0000000..8216b79 --- /dev/null +++ b/run_mac.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +VENV_DIR=".venv" + +# Check if virtual environment exists +if [ ! -d "$VENV_DIR" ]; then + echo "Virtual environment '$VENV_DIR' not found." + echo "Please run ./setup_mac.sh first to create the environment and install dependencies." + exit 1 +fi + +echo "Activating virtual environment..." +source "$VENV_DIR/bin/activate" + +echo "Starting the application with CPU execution provider..." +# Passes all arguments passed to this script (e.g., --source, --target) to run.py +python3 run.py --execution-provider cpu "$@" + +# Deactivate after script finishes (optional, as shell context closes) +# deactivate diff --git a/run_mac_coreml.sh b/run_mac_coreml.sh new file mode 100644 index 0000000..edea336 --- /dev/null +++ b/run_mac_coreml.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +VENV_DIR=".venv" + +if [ ! -d "$VENV_DIR" ]; then + echo "Virtual environment '$VENV_DIR' not found." + echo "Please run ./setup_mac.sh first." + exit 1 +fi + +source "$VENV_DIR/bin/activate" +echo "Starting the application with CoreML execution provider..." +python3 run.py --execution-provider coreml "$@" diff --git a/run_mac_cpu.sh b/run_mac_cpu.sh new file mode 100644 index 0000000..75c5107 --- /dev/null +++ b/run_mac_cpu.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +VENV_DIR=".venv" + +if [ ! -d "$VENV_DIR" ]; then + echo "Virtual environment '$VENV_DIR' not found." + echo "Please run ./setup_mac.sh first." + exit 1 +fi + +source "$VENV_DIR/bin/activate" +echo "Starting the application with CPU execution provider..." +python3 run.py --execution-provider cpu "$@" diff --git a/run_mac_mps.sh b/run_mac_mps.sh new file mode 100644 index 0000000..bc2b363 --- /dev/null +++ b/run_mac_mps.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +VENV_DIR=".venv" + +if [ ! -d "$VENV_DIR" ]; then + echo "Virtual environment '$VENV_DIR' not found." + echo "Please run ./setup_mac.sh first." + exit 1 +fi + +source "$VENV_DIR/bin/activate" +echo "Starting the application with MPS execution provider (for Apple Silicon)..." +python3 run.py --execution-provider mps "$@" diff --git a/run_windows.bat b/run_windows.bat new file mode 100644 index 0000000..3894226 --- /dev/null +++ b/run_windows.bat @@ -0,0 +1,20 @@ +@echo off +set VENV_DIR=.venv + +:: Check if virtual environment exists +if not exist "%VENV_DIR%\Scripts\activate.bat" ( + echo Virtual environment '%VENV_DIR%' not found. + echo Please run setup_windows.bat first to create the environment and install dependencies. + pause + exit /b 1 +) + +echo Activating virtual environment... +call "%VENV_DIR%\Scripts\activate.bat" + +echo Starting the application with CPU execution provider... +:: Passes all arguments passed to this script to run.py +python run.py --execution-provider cpu %* + +:: Optional: Deactivate after script finishes +:: call deactivate diff --git a/setup_mac.sh b/setup_mac.sh new file mode 100644 index 0000000..f9effb5 --- /dev/null +++ b/setup_mac.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash + +# Exit immediately if a command exits with a non-zero status. +set -e + +echo "Starting macOS setup..." + +# 1. Check for Python 3 +echo "Checking for Python 3..." +if ! command -v python3 &> /dev/null +then + echo "Python 3 could not be found. Please install Python 3." + echo "You can often install it using Homebrew: brew install python" + exit 1 +fi + +# 2. Check Python version (>= 3.9) +echo "Checking Python 3 version..." +python3 -c 'import sys; exit(0) if sys.version_info >= (3,9) else exit(1)' +if [ $? -ne 0 ]; then + echo "Python 3.9 or higher is required." + echo "Your version is: $(python3 --version)" + echo "Please upgrade your Python version. Consider using pyenv or Homebrew to manage Python versions." + exit 1 +fi +echo "Python 3.9+ found: $(python3 --version)" + +# 3. Check for ffmpeg +echo "Checking for ffmpeg..." +if ! command -v ffmpeg &> /dev/null +then + echo "WARNING: ffmpeg could not be found. This program requires ffmpeg for video processing." + echo "You can install it using Homebrew: brew install ffmpeg" + echo "Continuing with setup, but video processing might fail later." +else + echo "ffmpeg found: $(ffmpeg -version | head -n 1)" +fi + +# 4. Define virtual environment directory +VENV_DIR=".venv" + +# 5. Create virtual environment +if [ -d "$VENV_DIR" ]; then + echo "Virtual environment '$VENV_DIR' already exists. Skipping creation." +else + echo "Creating virtual environment in '$VENV_DIR'..." + python3 -m venv "$VENV_DIR" +fi + +# 6. Activate virtual environment (for this script's session) +echo "Activating virtual environment..." +source "$VENV_DIR/bin/activate" + +# 7. Upgrade pip +echo "Upgrading pip..." +pip install --upgrade pip + +# 8. Install requirements +echo "Installing requirements from requirements.txt..." +if [ -f "requirements.txt" ]; then + pip install -r requirements.txt +else + echo "ERROR: requirements.txt not found. Cannot install dependencies." + # Deactivate on error if desired, or leave active for user to debug + # deactivate + exit 1 +fi + +echo "" +echo "Setup complete!" +echo "" +echo "To activate the virtual environment in your terminal, run:" +echo " source $VENV_DIR/bin/activate" +echo "" +echo "After activating, you can run the application using:" +echo " python3 run.py [arguments]" +echo "Or use one of the run_mac_*.sh scripts (e.g., ./run_mac_cpu.sh)." +echo "" + +# Deactivate at the end of the script's execution (optional, as script session ends) +# deactivate diff --git a/setup_windows.bat b/setup_windows.bat new file mode 100644 index 0000000..9dec25b --- /dev/null +++ b/setup_windows.bat @@ -0,0 +1,80 @@ +@echo off +echo Starting Windows setup... + +:: 1. Check for Python +echo Checking for Python... +python --version >nul 2>&1 +if errorlevel 1 ( + echo Python could not be found in your PATH. + echo Please install Python 3 (3.10 or higher recommended) and ensure it's added to your PATH. + echo You can download Python from https://www.python.org/downloads/ + pause + exit /b 1 +) + +:: Optional: Check Python version (e.g., >= 3.9 or >=3.10). +:: This is a bit more complex in pure batch. For now, rely on user having a modern Python 3. +:: The README will recommend 3.10. +:: If we reach here, Python is found. +echo Python was found. Attempting to display version: +for /f "delims=" %%i in ('python --version 2^>^&1') do echo %%i + +:: 2. Check for ffmpeg (informational) +echo Checking for ffmpeg... +ffmpeg -version >nul 2>&1 +if errorlevel 1 ( + echo WARNING: ffmpeg could not be found in your PATH. This program requires ffmpeg for video processing. + echo Please download ffmpeg from https://ffmpeg.org/download.html and add it to your system's PATH. + echo (The README.md contains a link for a potentially easier ffmpeg install method using a PowerShell command) + echo Continuing with setup, but video processing might fail later. + pause +) else ( + echo ffmpeg found. +) + +:: 3. Define virtual environment directory +set VENV_DIR=.venv + +:: 4. Create virtual environment +if exist "%VENV_DIR%\Scripts\activate.bat" ( + echo Virtual environment '%VENV_DIR%' already exists. Skipping creation. +) else ( + echo Creating virtual environment in '%VENV_DIR%'... + python -m venv "%VENV_DIR%" + if errorlevel 1 ( + echo Failed to create virtual environment. Please check your Python installation. + pause + exit /b 1 + ) +) + +:: 5. Activate virtual environment (for this script's session) +echo Activating virtual environment... +call "%VENV_DIR%\Scripts\activate.bat" + +:: 6. Upgrade pip +echo Upgrading pip... +python -m pip install --upgrade pip + +:: 7. Install requirements +echo Installing requirements from requirements.txt... +if exist "requirements.txt" ( + python -m pip install -r requirements.txt +) else ( + echo ERROR: requirements.txt not found. Cannot install dependencies. + pause + exit /b 1 +) + +echo. +echo Setup complete! +echo. +echo To activate the virtual environment in your command prompt, run: +echo %VENV_DIR%\Scripts\activate.bat +echo. +echo After activating, you can run the application using: +echo python run.py [arguments] +echo Or use one of the run-*.bat scripts (e.g., run-cuda.bat, run_windows.bat). +echo. +pause +exit /b 0