Compare commits
	
		
			7 Commits 
		
	
	
		
			39e5bf7f00
			...
			c08ffa88c5
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | c08ffa88c5 | |
|  | 5db23597e9 | |
|  | 84ae5810bf | |
|  | ebc30b1cac | |
|  | 0e6d821102 | |
|  | 6f635ab7c4 | |
|  | 74ce8569f5 | 
|  | @ -2,6 +2,7 @@ import os | ||||||
| import shutil | import shutil | ||||||
| from typing import Any | from typing import Any | ||||||
| import insightface | import insightface | ||||||
|  | import logging # Added logging import | ||||||
| 
 | 
 | ||||||
| import cv2 | import cv2 | ||||||
| import numpy as np | import numpy as np | ||||||
|  | @ -25,18 +26,27 @@ def get_face_analyser() -> Any: | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_one_face(frame: Frame) -> Any: | def get_one_face(frame: Frame) -> Any: | ||||||
|     face = get_face_analyser().get(frame) |     faces = get_face_analyser().get(frame) | ||||||
|  |     if not faces: | ||||||
|  |         logging.debug("Face_analyser: get_one_face: No faces found by insightface.") | ||||||
|  |         return None | ||||||
|     try: |     try: | ||||||
|         return min(face, key=lambda x: x.bbox[0]) |         return min(faces, key=lambda x: x.bbox[0]) | ||||||
|     except ValueError: |     except ValueError: | ||||||
|  |         logging.debug("Face_analyser: get_one_face: ValueError, likely no faces after all.") | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_many_faces(frame: Frame) -> Any: | def get_many_faces(frame: Frame) -> Any: | ||||||
|     try: |     faces = get_face_analyser().get(frame) | ||||||
|         return get_face_analyser().get(frame) |     if not faces: # Check if faces is None or an empty list | ||||||
|     except IndexError: |         logging.debug("Face_analyser: get_many_faces: No faces found by insightface.") | ||||||
|         return None |         # Depending on what insightface returns for no faces, | ||||||
|  |         # you might return None or an empty list. | ||||||
|  |         # If .get() returns an empty list for no faces, this check is sufficient. | ||||||
|  |         # If .get() returns None, this is also fine. | ||||||
|  |         return faces # Return original (None or empty list) | ||||||
|  |     return faces | ||||||
| 
 | 
 | ||||||
| def has_valid_map() -> bool: | def has_valid_map() -> bool: | ||||||
|     for map in modules.globals.source_target_map: |     for map in modules.globals.source_target_map: | ||||||
|  |  | ||||||
|  | @ -82,7 +82,8 @@ def get_face_enhancer() -> Any: | ||||||
|                 selected_device = torch.device("cpu") |                 selected_device = torch.device("cpu") | ||||||
|                 device_priority.append("CPU") |                 device_priority.append("CPU") | ||||||
|              |              | ||||||
|             FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1, device=selected_device) |             upscale_factor = getattr(modules.globals, 'gfpgan_upscale_factor', 2) | ||||||
|  |             FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=upscale_factor, device=selected_device) | ||||||
| 
 | 
 | ||||||
|             # for debug: |             # for debug: | ||||||
|             print(f"Selected device: {selected_device} and device priority: {device_priority}") |             print(f"Selected device: {selected_device} and device priority: {device_priority}") | ||||||
|  |  | ||||||
|  | @ -21,6 +21,16 @@ FACE_SWAPPER = None | ||||||
| THREAD_LOCK = threading.Lock() | THREAD_LOCK = threading.Lock() | ||||||
| NAME = "DLC.FACE-SWAPPER" | NAME = "DLC.FACE-SWAPPER" | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | def _validate_kernel_size(kernel_tuple, default_kernel_tuple): | ||||||
|  |     if isinstance(kernel_tuple, tuple) and len(kernel_tuple) == 2 and \ | ||||||
|  |        isinstance(kernel_tuple[0], int) and kernel_tuple[0] > 0 and kernel_tuple[0] % 2 == 1 and \ | ||||||
|  |        isinstance(kernel_tuple[1], int) and kernel_tuple[1] > 0 and kernel_tuple[1] % 2 == 1: | ||||||
|  |         return kernel_tuple | ||||||
|  |     else: | ||||||
|  |         logging.warning(f"Invalid kernel size {kernel_tuple} received. Must be a tuple of two positive odd integers. Falling back to default {default_kernel_tuple}.") | ||||||
|  |         return default_kernel_tuple | ||||||
|  | 
 | ||||||
| abs_dir = os.path.dirname(os.path.abspath(__file__)) | abs_dir = os.path.dirname(os.path.abspath(__file__)) | ||||||
| models_dir = os.path.join( | models_dir = os.path.join( | ||||||
|     os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" |     os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" | ||||||
|  | @ -32,7 +42,7 @@ def pre_check() -> bool: | ||||||
|     conditional_download( |     conditional_download( | ||||||
|         download_directory_path, |         download_directory_path, | ||||||
|         [ |         [ | ||||||
|             "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx" |             "https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128.onnx" | ||||||
|         ], |         ], | ||||||
|     ) |     ) | ||||||
|     return True |     return True | ||||||
|  | @ -60,7 +70,7 @@ def get_face_swapper() -> Any: | ||||||
| 
 | 
 | ||||||
|     with THREAD_LOCK: |     with THREAD_LOCK: | ||||||
|         if FACE_SWAPPER is None: |         if FACE_SWAPPER is None: | ||||||
|             model_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") |             model_path = os.path.join(models_dir, "inswapper_128.onnx") | ||||||
|             FACE_SWAPPER = insightface.model_zoo.get_model( |             FACE_SWAPPER = insightface.model_zoo.get_model( | ||||||
|                 model_path, providers=modules.globals.execution_providers |                 model_path, providers=modules.globals.execution_providers | ||||||
|             ) |             ) | ||||||
|  | @ -70,18 +80,38 @@ def get_face_swapper() -> Any: | ||||||
| def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: | ||||||
|     face_swapper = get_face_swapper() |     face_swapper = get_face_swapper() | ||||||
| 
 | 
 | ||||||
|     # Apply the face swap |     # Statistical color correction | ||||||
|     swapped_frame = face_swapper.get( |     if getattr(modules.globals, 'statistical_color_correction', True) and target_face.bbox is not None: | ||||||
|         temp_frame, target_face, source_face, paste_back=True |         x1, y1, x2, y2 = target_face.bbox.astype(int) | ||||||
|     ) |         original_target_face_roi = temp_frame[y1:y2, x1:x2].copy() | ||||||
|  | 
 | ||||||
|  |         # Apply the face swap | ||||||
|  |         swapped_frame = face_swapper.get( | ||||||
|  |             temp_frame, target_face, source_face, paste_back=True | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         if original_target_face_roi.size > 0: | ||||||
|  |             swapped_face_roi = swapped_frame[y1:y2, x1:x2].copy() | ||||||
|  |             if swapped_face_roi.size > 0: | ||||||
|  |                 try: | ||||||
|  |                     corrected_swapped_face_roi = apply_color_transfer(swapped_face_roi, original_target_face_roi) | ||||||
|  |                     swapped_frame[y1:y2, x1:x2] = corrected_swapped_face_roi | ||||||
|  |                 except Exception as e: | ||||||
|  |                     logging.error(f"Failed to apply statistical color transfer: {e}. Using original swapped ROI.") | ||||||
|  |                     # swapped_frame already contains the uncorrected swapped_face_roi in this region | ||||||
|  |     else: | ||||||
|  |         # Apply the face swap without statistical color correction | ||||||
|  |         swapped_frame = face_swapper.get( | ||||||
|  |             temp_frame, target_face, source_face, paste_back=True | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|     if modules.globals.mouth_mask: |     if modules.globals.mouth_mask: | ||||||
|         # Create a mask for the target face |         # Create a mask for the target face | ||||||
|         face_mask = create_face_mask(target_face, temp_frame) |         face_mask = create_face_mask(target_face, swapped_frame) # Use swapped_frame here | ||||||
| 
 | 
 | ||||||
|         # Create the mouth mask |         # Create the mouth mask | ||||||
|         mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( |         mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = ( | ||||||
|             create_lower_mouth_mask(target_face, temp_frame) |             create_lower_mouth_mask(target_face, swapped_frame) # Use swapped_frame here | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # Apply the mouth area |         # Apply the mouth area | ||||||
|  | @ -106,16 +136,26 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: | ||||||
|         many_faces = get_many_faces(temp_frame) |         many_faces = get_many_faces(temp_frame) | ||||||
|         if many_faces: |         if many_faces: | ||||||
|             for target_face in many_faces: |             for target_face in many_faces: | ||||||
|                 if source_face and target_face: |                 if source_face and target_face: # target_face from many_faces will always be valid here | ||||||
|                     temp_frame = swap_face(source_face, target_face, temp_frame) |                     temp_frame = swap_face(source_face, target_face, temp_frame) | ||||||
|                 else: |                 elif not source_face: # Check source_face specifically | ||||||
|                     print("Face detection failed for target/source.") |                     logging.error("Source face is not available or no face detected in source image. Skipping swap for this target face.") | ||||||
|  |                     # Optionally `continue` or `break` if source_face is essential for all | ||||||
|  |         elif not source_face : # if many_faces is empty AND source_face is also an issue | ||||||
|  |              logging.error("Source face is not available AND no faces detected in target frame.") | ||||||
|  |         else: # many_faces is empty, but source_face is ok | ||||||
|  |              logging.info(f"No faces detected in the current target frame for 'many_faces' mode.") | ||||||
|     else: |     else: | ||||||
|         target_face = get_one_face(temp_frame) |         target_face = get_one_face(temp_frame) | ||||||
|         if target_face and source_face: |         if target_face and source_face: | ||||||
|             temp_frame = swap_face(source_face, target_face, temp_frame) |             temp_frame = swap_face(source_face, target_face, temp_frame) | ||||||
|         else: |         else: | ||||||
|             logging.error("Face detection failed for target or source.") |             if not source_face: | ||||||
|  |                 logging.error("Source face is not available or no face detected in source image.") | ||||||
|  |             elif not target_face: | ||||||
|  |                 logging.error(f"No face detected in the current target frame.") | ||||||
|  |             else: # Should not happen if logic is right, but as a fallback | ||||||
|  |                 logging.error("Face detection failed for an unknown reason concerning target or source.") | ||||||
|     return temp_frame |     return temp_frame | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -367,7 +407,12 @@ def create_lower_mouth_mask( | ||||||
|         cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255) |         cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255) | ||||||
| 
 | 
 | ||||||
|         # Apply Gaussian blur to soften the mask edges |         # Apply Gaussian blur to soften the mask edges | ||||||
|         mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5) |         # Default kernel size for mouth mask blur is (9,9) as a balance between performance and smoothing. | ||||||
|  |         # Larger values (e.g., (15,15) - the previous hardcoded value) provide more smoothing but are slower. | ||||||
|  |         # This is configurable via modules.globals.mouth_mask_blur_kernel_size. | ||||||
|  |         kernel_size_mouth_config = getattr(modules.globals, 'mouth_mask_blur_kernel_size', (9, 9)) | ||||||
|  |         valid_kernel_mouth = _validate_kernel_size(kernel_size_mouth_config, (9, 9)) | ||||||
|  |         mask_roi = cv2.GaussianBlur(mask_roi, valid_kernel_mouth, 0) | ||||||
| 
 | 
 | ||||||
|         # Place the mask ROI in the full-sized mask |         # Place the mask ROI in the full-sized mask | ||||||
|         mask[min_y:max_y, min_x:max_x] = mask_roi |         mask[min_y:max_y, min_x:max_x] = mask_roi | ||||||
|  | @ -508,7 +553,13 @@ def apply_mouth_area( | ||||||
|         feathered_mask = cv2.GaussianBlur( |         feathered_mask = cv2.GaussianBlur( | ||||||
|             polygon_mask.astype(float), (0, 0), feather_amount |             polygon_mask.astype(float), (0, 0), feather_amount | ||||||
|         ) |         ) | ||||||
|         feathered_mask = feathered_mask / feathered_mask.max() | 
 | ||||||
|  |         mask_max_value = feathered_mask.max() | ||||||
|  |         if mask_max_value < 1e-6:  # Check if max is effectively zero | ||||||
|  |             logging.warning("Mouth mask's feathered_mask is all zeros or near-zeros after blur. Resulting mask will be black.") | ||||||
|  |             feathered_mask = np.zeros_like(polygon_mask, dtype=np.uint8) | ||||||
|  |         else: | ||||||
|  |             feathered_mask = (feathered_mask / mask_max_value * 255).astype(np.uint8) | ||||||
| 
 | 
 | ||||||
|         face_mask_roi = face_mask[min_y:max_y, min_x:max_x] |         face_mask_roi = face_mask[min_y:max_y, min_x:max_x] | ||||||
|         combined_mask = feathered_mask * (face_mask_roi / 255.0) |         combined_mask = feathered_mask * (face_mask_roi / 255.0) | ||||||
|  | @ -553,7 +604,8 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | ||||||
| 
 | 
 | ||||||
|         face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]]) |         face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]]) | ||||||
|         forehead_height = face_top - eyebrow_top |         forehead_height = face_top - eyebrow_top | ||||||
|         extended_forehead_height = int(forehead_height * 5.0)  # Extend by 50% |         forehead_factor = getattr(modules.globals, 'forehead_extension_factor', 2.5) | ||||||
|  |         extended_forehead_height = int(forehead_height * forehead_factor) | ||||||
| 
 | 
 | ||||||
|         # Create forehead points |         # Create forehead points | ||||||
|         forehead_left = right_side_face[0].copy() |         forehead_left = right_side_face[0].copy() | ||||||
|  | @ -595,7 +647,9 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray: | ||||||
|         cv2.fillConvexPoly(mask, hull_padded, 255) |         cv2.fillConvexPoly(mask, hull_padded, 255) | ||||||
| 
 | 
 | ||||||
|         # Smooth the mask edges |         # Smooth the mask edges | ||||||
|         mask = cv2.GaussianBlur(mask, (5, 5), 3) |         kernel_size_face_config = getattr(modules.globals, 'face_mask_blur_kernel_size', (5, 5)) | ||||||
|  |         valid_kernel_face = _validate_kernel_size(kernel_size_face_config, (5, 5)) | ||||||
|  |         mask = cv2.GaussianBlur(mask, valid_kernel_face, 0) | ||||||
| 
 | 
 | ||||||
|     return mask |     return mask | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue