Compare commits
13 Commits
feae2657c9
...
2d0c5bc8d0
Author | SHA1 | Date |
---|---|---|
|
2d0c5bc8d0 | |
|
5708be40eb | |
|
f08c81f22a | |
|
2faaecbe15 | |
|
53c72d6774 | |
|
98e7320237 | |
|
12d7ca8bad | |
|
133b2ac330 | |
|
57ac933dff | |
|
11c2717a1d | |
|
0c5bb269f2 | |
|
6a1f87dc69 | |
|
81c1a817cc |
|
@ -1,220 +0,0 @@
|
||||||
"""
|
|
||||||
Advanced Face Tracking with Occlusion Handling and Stabilization
|
|
||||||
"""
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
from typing import Optional, Tuple, List, Dict, Any
|
|
||||||
from collections import deque
|
|
||||||
import time
|
|
||||||
from modules.typing import Face, Frame
|
|
||||||
|
|
||||||
|
|
||||||
class FaceTracker:
|
|
||||||
def __init__(self):
|
|
||||||
# Face tracking history
|
|
||||||
self.face_history = deque(maxlen=10)
|
|
||||||
self.stable_face_position = None
|
|
||||||
self.last_valid_face = None
|
|
||||||
self.tracking_confidence = 0.0
|
|
||||||
|
|
||||||
# Stabilization parameters
|
|
||||||
self.position_smoothing = 0.7 # Higher = more stable, lower = more responsive
|
|
||||||
self.size_smoothing = 0.8
|
|
||||||
self.landmark_smoothing = 0.6
|
|
||||||
|
|
||||||
# Occlusion detection
|
|
||||||
self.occlusion_threshold = 0.3
|
|
||||||
self.face_template = None
|
|
||||||
self.template_update_interval = 30 # frames
|
|
||||||
self.frame_count = 0
|
|
||||||
|
|
||||||
# Kalman filter for position prediction
|
|
||||||
self.kalman_filter = self._init_kalman_filter()
|
|
||||||
|
|
||||||
def _init_kalman_filter(self):
|
|
||||||
"""Initialize Kalman filter for face position prediction"""
|
|
||||||
kalman = cv2.KalmanFilter(4, 2)
|
|
||||||
kalman.measurementMatrix = np.array([[1, 0, 0, 0],
|
|
||||||
[0, 1, 0, 0]], np.float32)
|
|
||||||
kalman.transitionMatrix = np.array([[1, 0, 1, 0],
|
|
||||||
[0, 1, 0, 1],
|
|
||||||
[0, 0, 1, 0],
|
|
||||||
[0, 0, 0, 1]], np.float32)
|
|
||||||
kalman.processNoiseCov = 0.03 * np.eye(4, dtype=np.float32)
|
|
||||||
kalman.measurementNoiseCov = 0.1 * np.eye(2, dtype=np.float32)
|
|
||||||
return kalman
|
|
||||||
|
|
||||||
def track_face(self, current_face: Optional[Face], frame: Frame) -> Optional[Face]:
|
|
||||||
"""
|
|
||||||
Track face with stabilization and occlusion handling
|
|
||||||
"""
|
|
||||||
self.frame_count += 1
|
|
||||||
|
|
||||||
if current_face is not None:
|
|
||||||
# We have a detected face
|
|
||||||
stabilized_face = self._stabilize_face(current_face)
|
|
||||||
self._update_face_history(stabilized_face)
|
|
||||||
self._update_face_template(frame, stabilized_face)
|
|
||||||
self.last_valid_face = stabilized_face
|
|
||||||
self.tracking_confidence = min(1.0, self.tracking_confidence + 0.1)
|
|
||||||
return stabilized_face
|
|
||||||
|
|
||||||
else:
|
|
||||||
# No face detected - handle occlusion
|
|
||||||
if self.last_valid_face is not None and self.tracking_confidence > 0.3:
|
|
||||||
# Try to predict face position using tracking
|
|
||||||
predicted_face = self._predict_face_position(frame)
|
|
||||||
if predicted_face is not None:
|
|
||||||
self.tracking_confidence = max(0.0, self.tracking_confidence - 0.05)
|
|
||||||
return predicted_face
|
|
||||||
|
|
||||||
# Gradually reduce confidence
|
|
||||||
self.tracking_confidence = max(0.0, self.tracking_confidence - 0.1)
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _stabilize_face(self, face: Face) -> Face:
|
|
||||||
"""Apply stabilization to reduce jitter"""
|
|
||||||
if len(self.face_history) == 0:
|
|
||||||
return face
|
|
||||||
|
|
||||||
# Get the last stable face
|
|
||||||
last_face = self.face_history[-1]
|
|
||||||
|
|
||||||
# Smooth the bounding box
|
|
||||||
face.bbox = self._smooth_bbox(face.bbox, last_face.bbox)
|
|
||||||
|
|
||||||
# Smooth landmarks if available
|
|
||||||
if hasattr(face, 'landmark_2d_106') and face.landmark_2d_106 is not None:
|
|
||||||
if hasattr(last_face, 'landmark_2d_106') and last_face.landmark_2d_106 is not None:
|
|
||||||
face.landmark_2d_106 = self._smooth_landmarks(
|
|
||||||
face.landmark_2d_106, last_face.landmark_2d_106
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update Kalman filter
|
|
||||||
center_x = (face.bbox[0] + face.bbox[2]) / 2
|
|
||||||
center_y = (face.bbox[1] + face.bbox[3]) / 2
|
|
||||||
self.kalman_filter.correct(np.array([[center_x], [center_y]], dtype=np.float32))
|
|
||||||
|
|
||||||
return face
|
|
||||||
|
|
||||||
def _smooth_bbox(self, current_bbox: np.ndarray, last_bbox: np.ndarray) -> np.ndarray:
|
|
||||||
"""Smooth bounding box coordinates"""
|
|
||||||
alpha = 1 - self.position_smoothing
|
|
||||||
return alpha * current_bbox + (1 - alpha) * last_bbox
|
|
||||||
|
|
||||||
def _smooth_landmarks(self, current_landmarks: np.ndarray, last_landmarks: np.ndarray) -> np.ndarray:
|
|
||||||
"""Smooth facial landmarks"""
|
|
||||||
alpha = 1 - self.landmark_smoothing
|
|
||||||
return alpha * current_landmarks + (1 - alpha) * last_landmarks
|
|
||||||
|
|
||||||
def _update_face_history(self, face: Face):
|
|
||||||
"""Update face tracking history"""
|
|
||||||
self.face_history.append(face)
|
|
||||||
|
|
||||||
def _update_face_template(self, frame: Frame, face: Face):
|
|
||||||
"""Update face template for occlusion detection"""
|
|
||||||
if self.frame_count % self.template_update_interval == 0:
|
|
||||||
try:
|
|
||||||
x1, y1, x2, y2 = face.bbox.astype(int)
|
|
||||||
x1, y1 = max(0, x1), max(0, y1)
|
|
||||||
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
|
|
||||||
|
|
||||||
if x2 > x1 and y2 > y1:
|
|
||||||
face_region = frame[y1:y2, x1:x2]
|
|
||||||
self.face_template = cv2.resize(face_region, (64, 64))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _predict_face_position(self, frame: Frame) -> Optional[Face]:
|
|
||||||
"""Predict face position during occlusion"""
|
|
||||||
if self.last_valid_face is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Use Kalman filter prediction
|
|
||||||
prediction = self.kalman_filter.predict()
|
|
||||||
pred_x, pred_y = prediction[0, 0], prediction[1, 0]
|
|
||||||
|
|
||||||
# Create predicted face based on last valid face
|
|
||||||
predicted_face = self._create_predicted_face(pred_x, pred_y)
|
|
||||||
|
|
||||||
# Verify prediction using template matching if available
|
|
||||||
if self.face_template is not None:
|
|
||||||
confidence = self._verify_prediction(frame, predicted_face)
|
|
||||||
if confidence > self.occlusion_threshold:
|
|
||||||
return predicted_face
|
|
||||||
else:
|
|
||||||
return predicted_face
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _create_predicted_face(self, center_x: float, center_y: float) -> Face:
|
|
||||||
"""Create a predicted face object"""
|
|
||||||
# Use the last valid face as template
|
|
||||||
predicted_face = type(self.last_valid_face)()
|
|
||||||
|
|
||||||
# Copy attributes from last valid face
|
|
||||||
for attr in dir(self.last_valid_face):
|
|
||||||
if not attr.startswith('_'):
|
|
||||||
try:
|
|
||||||
setattr(predicted_face, attr, getattr(self.last_valid_face, attr))
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Update position
|
|
||||||
last_center_x = (self.last_valid_face.bbox[0] + self.last_valid_face.bbox[2]) / 2
|
|
||||||
last_center_y = (self.last_valid_face.bbox[1] + self.last_valid_face.bbox[3]) / 2
|
|
||||||
|
|
||||||
offset_x = center_x - last_center_x
|
|
||||||
offset_y = center_y - last_center_y
|
|
||||||
|
|
||||||
# Update bbox
|
|
||||||
predicted_face.bbox = self.last_valid_face.bbox + [offset_x, offset_y, offset_x, offset_y]
|
|
||||||
|
|
||||||
# Update landmarks if available
|
|
||||||
if hasattr(predicted_face, 'landmark_2d_106') and predicted_face.landmark_2d_106 is not None:
|
|
||||||
predicted_face.landmark_2d_106 = self.last_valid_face.landmark_2d_106 + [offset_x, offset_y]
|
|
||||||
|
|
||||||
return predicted_face
|
|
||||||
|
|
||||||
def _verify_prediction(self, frame: Frame, predicted_face: Face) -> float:
|
|
||||||
"""Verify predicted face position using template matching"""
|
|
||||||
try:
|
|
||||||
x1, y1, x2, y2 = predicted_face.bbox.astype(int)
|
|
||||||
x1, y1 = max(0, x1), max(0, y1)
|
|
||||||
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
|
|
||||||
|
|
||||||
if x2 <= x1 or y2 <= y1:
|
|
||||||
return 0.0
|
|
||||||
|
|
||||||
current_region = frame[y1:y2, x1:x2]
|
|
||||||
current_region = cv2.resize(current_region, (64, 64))
|
|
||||||
|
|
||||||
# Template matching
|
|
||||||
result = cv2.matchTemplate(current_region, self.face_template, cv2.TM_CCOEFF_NORMED)
|
|
||||||
_, max_val, _, _ = cv2.minMaxLoc(result)
|
|
||||||
|
|
||||||
return max_val
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return 0.0
|
|
||||||
|
|
||||||
def is_face_stable(self) -> bool:
|
|
||||||
"""Check if face tracking is stable"""
|
|
||||||
return len(self.face_history) >= 5 and self.tracking_confidence > 0.7
|
|
||||||
|
|
||||||
def reset_tracking(self):
|
|
||||||
"""Reset tracking state"""
|
|
||||||
self.face_history.clear()
|
|
||||||
self.stable_face_position = None
|
|
||||||
self.last_valid_face = None
|
|
||||||
self.tracking_confidence = 0.0
|
|
||||||
self.face_template = None
|
|
||||||
self.kalman_filter = self._init_kalman_filter()
|
|
||||||
|
|
||||||
|
|
||||||
# Global face tracker instance
|
|
||||||
face_tracker = FaceTracker()
|
|
|
@ -42,11 +42,4 @@ mask_feather_ratio = 8
|
||||||
mask_down_size = 0.50
|
mask_down_size = 0.50
|
||||||
mask_size = 1
|
mask_size = 1
|
||||||
|
|
||||||
# Enhanced performance settings
|
# Removed all performance optimization variables
|
||||||
performance_mode = "balanced" # "fast", "balanced", "quality"
|
|
||||||
adaptive_quality = True
|
|
||||||
target_live_fps = 30
|
|
||||||
quality_level = 1.0
|
|
||||||
face_detection_interval = 0.1
|
|
||||||
enable_frame_caching = True
|
|
||||||
enable_gpu_acceleration = True
|
|
||||||
|
|
|
@ -1,247 +0,0 @@
|
||||||
"""
|
|
||||||
Enhanced Live Face Swapper with optimized performance and quality
|
|
||||||
"""
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from typing import Optional, Callable, Any
|
|
||||||
from collections import deque
|
|
||||||
import modules.globals
|
|
||||||
from modules.face_analyser import get_one_face, get_many_faces
|
|
||||||
from modules.processors.frame.face_swapper import swap_face_enhanced, get_face_swapper
|
|
||||||
from modules.performance_optimizer import performance_optimizer
|
|
||||||
from modules.video_capture import VideoCapturer
|
|
||||||
|
|
||||||
|
|
||||||
class LiveFaceSwapper:
|
|
||||||
def __init__(self):
|
|
||||||
self.is_running = False
|
|
||||||
self.source_face = None
|
|
||||||
self.video_capturer = None
|
|
||||||
self.processing_thread = None
|
|
||||||
self.display_callback = None
|
|
||||||
|
|
||||||
# Performance tracking
|
|
||||||
self.fps_counter = 0
|
|
||||||
self.fps_start_time = time.time()
|
|
||||||
self.current_fps = 0
|
|
||||||
self.processed_frames = 0
|
|
||||||
|
|
||||||
# Frame processing
|
|
||||||
self.input_queue = deque(maxlen=2) # Small queue to reduce latency
|
|
||||||
self.output_queue = deque(maxlen=2)
|
|
||||||
self.queue_lock = threading.Lock()
|
|
||||||
|
|
||||||
# Quality settings
|
|
||||||
self.quality_mode = "balanced" # "fast", "balanced", "quality"
|
|
||||||
self.adaptive_quality = True
|
|
||||||
|
|
||||||
def set_source_face(self, source_image_path: str) -> bool:
|
|
||||||
"""Set the source face for swapping"""
|
|
||||||
try:
|
|
||||||
source_image = cv2.imread(source_image_path)
|
|
||||||
if source_image is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
face = get_one_face(source_image)
|
|
||||||
if face is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
self.source_face = face
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error setting source face: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def start_live_swap(self, camera_index: int, display_callback: Callable[[np.ndarray, float], None]) -> bool:
|
|
||||||
"""Start live face swapping"""
|
|
||||||
try:
|
|
||||||
if self.source_face is None:
|
|
||||||
print("No source face set")
|
|
||||||
return False
|
|
||||||
|
|
||||||
self.display_callback = display_callback
|
|
||||||
self.video_capturer = VideoCapturer(camera_index)
|
|
||||||
|
|
||||||
# Start video capture with optimized settings
|
|
||||||
if not self.video_capturer.start(width=960, height=540, fps=30):
|
|
||||||
return False
|
|
||||||
|
|
||||||
self.is_running = True
|
|
||||||
self.processing_thread = threading.Thread(target=self._processing_loop, daemon=True)
|
|
||||||
self.processing_thread.start()
|
|
||||||
|
|
||||||
# Start capture loop
|
|
||||||
self._capture_loop()
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error starting live swap: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def stop_live_swap(self):
|
|
||||||
"""Stop live face swapping"""
|
|
||||||
self.is_running = False
|
|
||||||
if self.video_capturer:
|
|
||||||
self.video_capturer.release()
|
|
||||||
if self.processing_thread:
|
|
||||||
self.processing_thread.join(timeout=1.0)
|
|
||||||
|
|
||||||
def _capture_loop(self):
|
|
||||||
"""Main capture loop"""
|
|
||||||
while self.is_running:
|
|
||||||
try:
|
|
||||||
ret, frame = self.video_capturer.read()
|
|
||||||
if ret and frame is not None:
|
|
||||||
# Add frame to processing queue
|
|
||||||
with self.queue_lock:
|
|
||||||
if len(self.input_queue) < self.input_queue.maxlen:
|
|
||||||
self.input_queue.append(frame.copy())
|
|
||||||
|
|
||||||
# Small delay to prevent excessive CPU usage
|
|
||||||
time.sleep(0.001)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in capture loop: {e}")
|
|
||||||
break
|
|
||||||
|
|
||||||
def _processing_loop(self):
|
|
||||||
"""Background processing loop for face swapping"""
|
|
||||||
while self.is_running:
|
|
||||||
try:
|
|
||||||
frame_to_process = None
|
|
||||||
|
|
||||||
# Get frame from input queue
|
|
||||||
with self.queue_lock:
|
|
||||||
if self.input_queue:
|
|
||||||
frame_to_process = self.input_queue.popleft()
|
|
||||||
|
|
||||||
if frame_to_process is not None:
|
|
||||||
# Process the frame
|
|
||||||
processed_frame = self._process_frame(frame_to_process)
|
|
||||||
|
|
||||||
# Add to output queue
|
|
||||||
with self.queue_lock:
|
|
||||||
if len(self.output_queue) < self.output_queue.maxlen:
|
|
||||||
self.output_queue.append(processed_frame)
|
|
||||||
|
|
||||||
# Update FPS and call display callback
|
|
||||||
self._update_fps()
|
|
||||||
if self.display_callback:
|
|
||||||
self.display_callback(processed_frame, self.current_fps)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# No frame to process, small delay
|
|
||||||
time.sleep(0.005)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in processing loop: {e}")
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
def _process_frame(self, frame: np.ndarray) -> np.ndarray:
|
|
||||||
"""Process a single frame with face swapping, tracking, and occlusion handling"""
|
|
||||||
try:
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
# Apply performance optimizations
|
|
||||||
original_size = frame.shape[:2][::-1]
|
|
||||||
processed_frame = performance_optimizer.preprocess_frame(frame)
|
|
||||||
|
|
||||||
# Import face tracker
|
|
||||||
from modules.face_tracker import face_tracker
|
|
||||||
|
|
||||||
# Detect and track faces based on performance settings
|
|
||||||
if modules.globals.many_faces:
|
|
||||||
if performance_optimizer.should_detect_faces():
|
|
||||||
detected_faces = get_many_faces(processed_frame)
|
|
||||||
# Apply tracking to each face
|
|
||||||
tracked_faces = []
|
|
||||||
for face in (detected_faces or []):
|
|
||||||
tracked_face = face_tracker.track_face(face, processed_frame)
|
|
||||||
if tracked_face:
|
|
||||||
tracked_faces.append(tracked_face)
|
|
||||||
performance_optimizer.face_cache['many_faces'] = tracked_faces
|
|
||||||
else:
|
|
||||||
tracked_faces = performance_optimizer.face_cache.get('many_faces', [])
|
|
||||||
|
|
||||||
if tracked_faces:
|
|
||||||
for target_face in tracked_faces:
|
|
||||||
if self.source_face and target_face:
|
|
||||||
# Use enhanced swap with occlusion handling
|
|
||||||
from modules.processors.frame.face_swapper import swap_face_enhanced_with_occlusion
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(
|
|
||||||
self.source_face, target_face, processed_frame, frame
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if performance_optimizer.should_detect_faces():
|
|
||||||
detected_face = get_one_face(processed_frame)
|
|
||||||
tracked_face = face_tracker.track_face(detected_face, processed_frame)
|
|
||||||
performance_optimizer.face_cache['single_face'] = tracked_face
|
|
||||||
else:
|
|
||||||
tracked_face = performance_optimizer.face_cache.get('single_face')
|
|
||||||
|
|
||||||
if tracked_face and self.source_face:
|
|
||||||
# Use enhanced swap with occlusion handling
|
|
||||||
from modules.processors.frame.face_swapper import swap_face_enhanced_with_occlusion
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(
|
|
||||||
self.source_face, tracked_face, processed_frame, frame
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Try to use tracking even without detection (for occlusion handling)
|
|
||||||
tracked_face = face_tracker.track_face(None, processed_frame)
|
|
||||||
if tracked_face and self.source_face:
|
|
||||||
from modules.processors.frame.face_swapper import swap_face_enhanced_with_occlusion
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(
|
|
||||||
self.source_face, tracked_face, processed_frame, frame
|
|
||||||
)
|
|
||||||
|
|
||||||
# Post-process back to original size
|
|
||||||
final_frame = performance_optimizer.postprocess_frame(processed_frame, original_size)
|
|
||||||
|
|
||||||
# Update performance metrics
|
|
||||||
processing_time = time.time() - start_time
|
|
||||||
performance_optimizer.update_fps_stats(processing_time)
|
|
||||||
|
|
||||||
return final_frame
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error processing frame: {e}")
|
|
||||||
return frame
|
|
||||||
|
|
||||||
def _update_fps(self):
|
|
||||||
"""Update FPS counter"""
|
|
||||||
self.fps_counter += 1
|
|
||||||
current_time = time.time()
|
|
||||||
|
|
||||||
if current_time - self.fps_start_time >= 1.0:
|
|
||||||
self.current_fps = self.fps_counter / (current_time - self.fps_start_time)
|
|
||||||
self.fps_counter = 0
|
|
||||||
self.fps_start_time = current_time
|
|
||||||
|
|
||||||
def set_quality_mode(self, mode: str):
|
|
||||||
"""Set quality mode: 'fast', 'balanced', or 'quality'"""
|
|
||||||
self.quality_mode = mode
|
|
||||||
|
|
||||||
if mode == "fast":
|
|
||||||
performance_optimizer.quality_level = 0.7
|
|
||||||
performance_optimizer.detection_interval = 0.15
|
|
||||||
elif mode == "balanced":
|
|
||||||
performance_optimizer.quality_level = 0.85
|
|
||||||
performance_optimizer.detection_interval = 0.1
|
|
||||||
elif mode == "quality":
|
|
||||||
performance_optimizer.quality_level = 1.0
|
|
||||||
performance_optimizer.detection_interval = 0.05
|
|
||||||
|
|
||||||
def get_performance_stats(self) -> dict:
|
|
||||||
"""Get current performance statistics"""
|
|
||||||
return {
|
|
||||||
'fps': self.current_fps,
|
|
||||||
'quality_level': performance_optimizer.quality_level,
|
|
||||||
'detection_interval': performance_optimizer.detection_interval,
|
|
||||||
'processed_frames': self.processed_frames
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# Global instance
|
|
||||||
live_face_swapper = LiveFaceSwapper()
|
|
|
@ -1,151 +0,0 @@
|
||||||
"""
|
|
||||||
Performance Manager for Deep-Live-Cam
|
|
||||||
Handles performance mode switching and optimization settings
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Dict, Any
|
|
||||||
import modules.globals
|
|
||||||
from modules.performance_optimizer import performance_optimizer
|
|
||||||
|
|
||||||
|
|
||||||
class PerformanceManager:
|
|
||||||
def __init__(self):
|
|
||||||
self.config_path = "performance_config.json"
|
|
||||||
self.config = self.load_config()
|
|
||||||
self.current_mode = "balanced"
|
|
||||||
|
|
||||||
def load_config(self) -> Dict[str, Any]:
|
|
||||||
"""Load performance configuration from file"""
|
|
||||||
try:
|
|
||||||
if os.path.exists(self.config_path):
|
|
||||||
with open(self.config_path, 'r') as f:
|
|
||||||
return json.load(f)
|
|
||||||
else:
|
|
||||||
return self.get_default_config()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error loading performance config: {e}")
|
|
||||||
return self.get_default_config()
|
|
||||||
|
|
||||||
def get_default_config(self) -> Dict[str, Any]:
|
|
||||||
"""Get default performance configuration"""
|
|
||||||
return {
|
|
||||||
"performance_modes": {
|
|
||||||
"fast": {
|
|
||||||
"quality_level": 0.6,
|
|
||||||
"face_detection_interval": 0.2,
|
|
||||||
"target_fps": 30,
|
|
||||||
"frame_skip": 2,
|
|
||||||
"enable_caching": True,
|
|
||||||
"processing_resolution_scale": 0.7
|
|
||||||
},
|
|
||||||
"balanced": {
|
|
||||||
"quality_level": 0.85,
|
|
||||||
"face_detection_interval": 0.1,
|
|
||||||
"target_fps": 25,
|
|
||||||
"frame_skip": 1,
|
|
||||||
"enable_caching": True,
|
|
||||||
"processing_resolution_scale": 0.85
|
|
||||||
},
|
|
||||||
"quality": {
|
|
||||||
"quality_level": 1.0,
|
|
||||||
"face_detection_interval": 0.05,
|
|
||||||
"target_fps": 20,
|
|
||||||
"frame_skip": 1,
|
|
||||||
"enable_caching": False,
|
|
||||||
"processing_resolution_scale": 1.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def set_performance_mode(self, mode: str) -> bool:
|
|
||||||
"""Set performance mode (fast, balanced, quality)"""
|
|
||||||
try:
|
|
||||||
if mode not in self.config["performance_modes"]:
|
|
||||||
print(f"Invalid performance mode: {mode}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
mode_config = self.config["performance_modes"][mode]
|
|
||||||
self.current_mode = mode
|
|
||||||
|
|
||||||
# Apply settings to performance optimizer
|
|
||||||
performance_optimizer.quality_level = mode_config["quality_level"]
|
|
||||||
performance_optimizer.detection_interval = mode_config["face_detection_interval"]
|
|
||||||
performance_optimizer.target_fps = mode_config["target_fps"]
|
|
||||||
|
|
||||||
# Apply to globals
|
|
||||||
modules.globals.performance_mode = mode
|
|
||||||
modules.globals.quality_level = mode_config["quality_level"]
|
|
||||||
modules.globals.face_detection_interval = mode_config["face_detection_interval"]
|
|
||||||
modules.globals.target_live_fps = mode_config["target_fps"]
|
|
||||||
|
|
||||||
print(f"Performance mode set to: {mode}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error setting performance mode: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_current_mode(self) -> str:
|
|
||||||
"""Get current performance mode"""
|
|
||||||
return self.current_mode
|
|
||||||
|
|
||||||
def get_mode_info(self, mode: str) -> Dict[str, Any]:
|
|
||||||
"""Get information about a specific performance mode"""
|
|
||||||
return self.config["performance_modes"].get(mode, {})
|
|
||||||
|
|
||||||
def get_all_modes(self) -> Dict[str, Any]:
|
|
||||||
"""Get all available performance modes"""
|
|
||||||
return self.config["performance_modes"]
|
|
||||||
|
|
||||||
def optimize_for_hardware(self) -> str:
|
|
||||||
"""Automatically select optimal performance mode based on hardware"""
|
|
||||||
try:
|
|
||||||
import psutil
|
|
||||||
import torch
|
|
||||||
|
|
||||||
# Check available RAM
|
|
||||||
ram_gb = psutil.virtual_memory().total / (1024**3)
|
|
||||||
|
|
||||||
# Check GPU availability
|
|
||||||
has_gpu = torch.cuda.is_available()
|
|
||||||
|
|
||||||
# Check CPU cores
|
|
||||||
cpu_cores = psutil.cpu_count()
|
|
||||||
|
|
||||||
# Determine optimal mode
|
|
||||||
if has_gpu and ram_gb >= 8 and cpu_cores >= 8:
|
|
||||||
optimal_mode = "quality"
|
|
||||||
elif has_gpu and ram_gb >= 4:
|
|
||||||
optimal_mode = "balanced"
|
|
||||||
else:
|
|
||||||
optimal_mode = "fast"
|
|
||||||
|
|
||||||
self.set_performance_mode(optimal_mode)
|
|
||||||
print(f"Auto-optimized for hardware: {optimal_mode} mode")
|
|
||||||
print(f" RAM: {ram_gb:.1f}GB, GPU: {has_gpu}, CPU Cores: {cpu_cores}")
|
|
||||||
|
|
||||||
return optimal_mode
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in hardware optimization: {e}")
|
|
||||||
self.set_performance_mode("balanced")
|
|
||||||
return "balanced"
|
|
||||||
|
|
||||||
def get_performance_tips(self) -> list:
|
|
||||||
"""Get performance optimization tips"""
|
|
||||||
tips = [
|
|
||||||
"🚀 Use 'Fast' mode for maximum FPS during live streaming",
|
|
||||||
"⚖️ Use 'Balanced' mode for good quality with decent performance",
|
|
||||||
"🎨 Use 'Quality' mode for best results when processing videos",
|
|
||||||
"💾 Close other applications to free up system resources",
|
|
||||||
"🖥️ Use GPU acceleration when available (CUDA/DirectML)",
|
|
||||||
"📹 Lower camera resolution if experiencing lag",
|
|
||||||
"🔄 Enable frame caching for smoother playback",
|
|
||||||
"⚡ Ensure good lighting for better face detection"
|
|
||||||
]
|
|
||||||
return tips
|
|
||||||
|
|
||||||
|
|
||||||
# Global performance manager instance
|
|
||||||
performance_manager = PerformanceManager()
|
|
|
@ -1,76 +0,0 @@
|
||||||
"""
|
|
||||||
Performance optimization module for Deep-Live-Cam
|
|
||||||
Provides frame caching, adaptive quality, and FPS optimization
|
|
||||||
"""
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
|
||||||
import time
|
|
||||||
from typing import Dict, Any, Optional, Tuple
|
|
||||||
import threading
|
|
||||||
from collections import deque
|
|
||||||
import modules.globals
|
|
||||||
|
|
||||||
class PerformanceOptimizer:
|
|
||||||
def __init__(self):
|
|
||||||
self.frame_cache = {}
|
|
||||||
self.face_cache = {}
|
|
||||||
self.last_detection_time = 0
|
|
||||||
self.detection_interval = 0.1 # Detect faces every 100ms
|
|
||||||
self.adaptive_quality = True
|
|
||||||
self.target_fps = 30
|
|
||||||
self.frame_times = deque(maxlen=10)
|
|
||||||
self.current_fps = 0
|
|
||||||
self.quality_level = 1.0
|
|
||||||
self.min_quality = 0.5
|
|
||||||
self.max_quality = 1.0
|
|
||||||
|
|
||||||
def should_detect_faces(self) -> bool:
|
|
||||||
"""Determine if we should run face detection based on timing"""
|
|
||||||
current_time = time.time()
|
|
||||||
if current_time - self.last_detection_time > self.detection_interval:
|
|
||||||
self.last_detection_time = current_time
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def update_fps_stats(self, frame_time: float):
|
|
||||||
"""Update FPS statistics and adjust quality accordingly"""
|
|
||||||
self.frame_times.append(frame_time)
|
|
||||||
if len(self.frame_times) >= 5:
|
|
||||||
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
|
|
||||||
self.current_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
|
|
||||||
|
|
||||||
if self.adaptive_quality:
|
|
||||||
self._adjust_quality()
|
|
||||||
|
|
||||||
def _adjust_quality(self):
|
|
||||||
"""Dynamically adjust processing quality based on FPS"""
|
|
||||||
if self.current_fps < self.target_fps * 0.8: # Below 80% of target
|
|
||||||
self.quality_level = max(self.min_quality, self.quality_level - 0.1)
|
|
||||||
self.detection_interval = min(0.2, self.detection_interval + 0.02)
|
|
||||||
elif self.current_fps > self.target_fps * 0.95: # Above 95% of target
|
|
||||||
self.quality_level = min(self.max_quality, self.quality_level + 0.05)
|
|
||||||
self.detection_interval = max(0.05, self.detection_interval - 0.01)
|
|
||||||
|
|
||||||
def get_optimal_resolution(self, original_size: Tuple[int, int]) -> Tuple[int, int]:
|
|
||||||
"""Get optimal processing resolution based on current quality level"""
|
|
||||||
width, height = original_size
|
|
||||||
scale = self.quality_level
|
|
||||||
return (int(width * scale), int(height * scale))
|
|
||||||
|
|
||||||
def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
|
|
||||||
"""Preprocess frame for optimal performance"""
|
|
||||||
if self.quality_level < 1.0:
|
|
||||||
height, width = frame.shape[:2]
|
|
||||||
new_height = int(height * self.quality_level)
|
|
||||||
new_width = int(width * self.quality_level)
|
|
||||||
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
|
|
||||||
return frame
|
|
||||||
|
|
||||||
def postprocess_frame(self, frame: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray:
|
|
||||||
"""Postprocess frame to target resolution"""
|
|
||||||
if frame.shape[:2][::-1] != target_size:
|
|
||||||
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_CUBIC)
|
|
||||||
return frame
|
|
||||||
|
|
||||||
# Global optimizer instance
|
|
||||||
performance_optimizer = PerformanceOptimizer()
|
|
|
@ -5,7 +5,6 @@ import threading
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import modules.globals
|
import modules.globals
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
import modules.processors.frame.core
|
import modules.processors.frame.core
|
||||||
from modules.core import update_status
|
from modules.core import update_status
|
||||||
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
|
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
|
||||||
|
@ -71,7 +70,7 @@ def get_face_swapper() -> Any:
|
||||||
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
face_swapper = get_face_swapper()
|
face_swapper = get_face_swapper()
|
||||||
|
|
||||||
# Apply the face swap with optimized settings for better performance
|
# Simple face swap - maximum FPS
|
||||||
swapped_frame = face_swapper.get(
|
swapped_frame = face_swapper.get(
|
||||||
temp_frame, target_face, source_face, paste_back=True
|
temp_frame, target_face, source_face, paste_back=True
|
||||||
)
|
)
|
||||||
|
@ -99,18 +98,29 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
return swapped_frame
|
return swapped_frame
|
||||||
|
|
||||||
|
|
||||||
def swap_face_enhanced(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
# Simple face position smoothing for stability
|
||||||
"""Enhanced face swapping with better quality and performance optimizations"""
|
_last_face_position = None
|
||||||
|
_position_smoothing = 0.7 # Higher = more stable, lower = more responsive
|
||||||
|
|
||||||
|
def swap_face_stable(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
|
"""Ultra-fast face swap - maximum FPS priority"""
|
||||||
|
# Skip all complex processing for maximum FPS
|
||||||
face_swapper = get_face_swapper()
|
face_swapper = get_face_swapper()
|
||||||
|
swapped_frame = face_swapper.get(temp_frame, target_face, source_face, paste_back=True)
|
||||||
|
|
||||||
# Apply the face swap
|
# Skip all post-processing to maximize FPS
|
||||||
swapped_frame = face_swapper.get(
|
return swapped_frame
|
||||||
temp_frame, target_face, source_face, paste_back=True
|
|
||||||
)
|
|
||||||
|
def swap_face_ultra_fast(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||||||
|
"""Fast face swap with mouth mask support and forehead protection"""
|
||||||
|
face_swapper = get_face_swapper()
|
||||||
|
swapped_frame = face_swapper.get(temp_frame, target_face, source_face, paste_back=True)
|
||||||
|
|
||||||
# Enhanced post-processing for better quality
|
# Fix forehead hair issue - blend forehead area back to original
|
||||||
swapped_frame = enhance_face_swap_quality(swapped_frame, source_face, target_face, temp_frame)
|
swapped_frame = fix_forehead_hair_issue(swapped_frame, target_face, temp_frame)
|
||||||
|
|
||||||
|
# Add mouth mask functionality back (only if enabled)
|
||||||
if modules.globals.mouth_mask:
|
if modules.globals.mouth_mask:
|
||||||
# Create a mask for the target face
|
# Create a mask for the target face
|
||||||
face_mask = create_face_mask(target_face, temp_frame)
|
face_mask = create_face_mask(target_face, temp_frame)
|
||||||
|
@ -130,12 +140,12 @@ def swap_face_enhanced(source_face: Face, target_face: Face, temp_frame: Frame)
|
||||||
swapped_frame = draw_mouth_mask_visualization(
|
swapped_frame = draw_mouth_mask_visualization(
|
||||||
swapped_frame, target_face, mouth_mask_data
|
swapped_frame, target_face, mouth_mask_data
|
||||||
)
|
)
|
||||||
|
|
||||||
return swapped_frame
|
return swapped_frame
|
||||||
|
|
||||||
|
|
||||||
def enhance_face_swap_quality(swapped_frame: Frame, source_face: Face, target_face: Face, original_frame: Frame) -> Frame:
|
def fix_forehead_hair_issue(swapped_frame: Frame, target_face: Face, original_frame: Frame) -> Frame:
|
||||||
"""Apply quality enhancements to the swapped face"""
|
"""Fix hair falling on forehead by blending forehead area back to original"""
|
||||||
try:
|
try:
|
||||||
# Get face bounding box
|
# Get face bounding box
|
||||||
bbox = target_face.bbox.astype(int)
|
bbox = target_face.bbox.astype(int)
|
||||||
|
@ -148,343 +158,149 @@ def enhance_face_swap_quality(swapped_frame: Frame, source_face: Face, target_fa
|
||||||
|
|
||||||
if x2 <= x1 or y2 <= y1:
|
if x2 <= x1 or y2 <= y1:
|
||||||
return swapped_frame
|
return swapped_frame
|
||||||
|
|
||||||
|
# Focus on forehead area (upper 35% of face)
|
||||||
|
forehead_height = int((y2 - y1) * 0.35)
|
||||||
|
forehead_y2 = y1 + forehead_height
|
||||||
|
|
||||||
|
if forehead_y2 > y1:
|
||||||
|
# Extract forehead regions
|
||||||
|
swapped_forehead = swapped_frame[y1:forehead_y2, x1:x2]
|
||||||
|
original_forehead = original_frame[y1:forehead_y2, x1:x2]
|
||||||
|
|
||||||
# Extract face regions
|
# Create a soft blend mask for forehead area
|
||||||
swapped_face = swapped_frame[y1:y2, x1:x2]
|
mask = np.ones(swapped_forehead.shape[:2], dtype=np.float32)
|
||||||
original_face = original_frame[y1:y2, x1:x2]
|
|
||||||
|
# Apply strong Gaussian blur for very soft blending
|
||||||
# Apply color matching
|
mask = cv2.GaussianBlur(mask, (31, 31), 10)
|
||||||
color_matched = apply_advanced_color_matching(swapped_face, original_face)
|
mask = mask[:, :, np.newaxis]
|
||||||
|
|
||||||
# Apply edge smoothing
|
# Blend forehead areas (keep much more of original to preserve hair)
|
||||||
smoothed = apply_edge_smoothing(color_matched, original_face)
|
blended_forehead = (swapped_forehead * 0.3 + original_forehead * 0.7).astype(np.uint8)
|
||||||
|
|
||||||
# Blend back into frame
|
# Apply the blended forehead back
|
||||||
swapped_frame[y1:y2, x1:x2] = smoothed
|
swapped_frame[y1:forehead_y2, x1:x2] = blended_forehead
|
||||||
|
|
||||||
return swapped_frame
|
return swapped_frame
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
# Return original swapped frame if enhancement fails
|
|
||||||
return swapped_frame
|
return swapped_frame
|
||||||
|
|
||||||
|
|
||||||
def apply_advanced_color_matching(swapped_face: np.ndarray, target_face: np.ndarray) -> np.ndarray:
|
def improve_forehead_matching(swapped_frame: Frame, source_face: Face, target_face: Face, original_frame: Frame) -> Frame:
|
||||||
"""Apply advanced color matching between swapped and target faces"""
|
"""Create precise face mask - only swap core facial features (eyes, nose, cheeks, chin)"""
|
||||||
try:
|
try:
|
||||||
# Convert to LAB color space for better color matching
|
# Get face landmarks for precise masking
|
||||||
swapped_lab = cv2.cvtColor(swapped_face, cv2.COLOR_BGR2LAB).astype(np.float32)
|
if hasattr(target_face, 'landmark_2d_106') and target_face.landmark_2d_106 is not None:
|
||||||
target_lab = cv2.cvtColor(target_face, cv2.COLOR_BGR2LAB).astype(np.float32)
|
landmarks = target_face.landmark_2d_106.astype(np.int32)
|
||||||
|
|
||||||
# Calculate statistics for each channel
|
|
||||||
swapped_mean = np.mean(swapped_lab, axis=(0, 1))
|
|
||||||
swapped_std = np.std(swapped_lab, axis=(0, 1))
|
|
||||||
target_mean = np.mean(target_lab, axis=(0, 1))
|
|
||||||
target_std = np.std(target_lab, axis=(0, 1))
|
|
||||||
|
|
||||||
# Apply color transfer
|
|
||||||
for i in range(3):
|
|
||||||
if swapped_std[i] > 0:
|
|
||||||
swapped_lab[:, :, i] = (swapped_lab[:, :, i] - swapped_mean[i]) * (target_std[i] / swapped_std[i]) + target_mean[i]
|
|
||||||
|
|
||||||
# Convert back to BGR
|
|
||||||
result = cv2.cvtColor(np.clip(swapped_lab, 0, 255).astype(np.uint8), cv2.COLOR_LAB2BGR)
|
|
||||||
return result
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return swapped_face
|
|
||||||
|
|
||||||
|
|
||||||
def apply_edge_smoothing(face: np.ndarray, reference: np.ndarray) -> np.ndarray:
|
|
||||||
"""Apply edge smoothing to reduce artifacts"""
|
|
||||||
try:
|
|
||||||
# Create a soft mask for blending edges
|
|
||||||
mask = np.ones(face.shape[:2], dtype=np.float32)
|
|
||||||
|
|
||||||
# Apply Gaussian blur to create soft edges
|
|
||||||
kernel_size = max(5, min(face.shape[0], face.shape[1]) // 20)
|
|
||||||
if kernel_size % 2 == 0:
|
|
||||||
kernel_size += 1
|
|
||||||
|
|
||||||
mask = cv2.GaussianBlur(mask, (kernel_size, kernel_size), 0)
|
# Create precise face mask excluding forehead and hair
|
||||||
mask = mask[:, :, np.newaxis]
|
mask = create_precise_face_mask(landmarks, swapped_frame.shape[:2])
|
||||||
|
|
||||||
|
if mask is not None:
|
||||||
|
# Apply the precise mask
|
||||||
|
mask_3d = mask[:, :, np.newaxis] / 255.0
|
||||||
|
|
||||||
|
# Blend only the core facial features
|
||||||
|
result = (swapped_frame * mask_3d + original_frame * (1 - mask_3d)).astype(np.uint8)
|
||||||
|
return result
|
||||||
|
|
||||||
# Blend with reference for smoother edges
|
# Fallback: use bounding box method but exclude forehead
|
||||||
blended = face * mask + reference * (1 - mask)
|
|
||||||
return blended.astype(np.uint8)
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
return face
|
|
||||||
|
|
||||||
|
|
||||||
def swap_face_enhanced_with_occlusion(source_face: Face, target_face: Face, temp_frame: Frame, original_frame: Frame) -> Frame:
|
|
||||||
"""Enhanced face swapping with occlusion handling and stabilization"""
|
|
||||||
face_swapper = get_face_swapper()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Get face bounding box
|
|
||||||
bbox = target_face.bbox.astype(int)
|
bbox = target_face.bbox.astype(int)
|
||||||
x1, y1, x2, y2 = bbox
|
x1, y1, x2, y2 = bbox
|
||||||
|
|
||||||
# Ensure coordinates are within frame bounds
|
# Ensure coordinates are within frame bounds
|
||||||
h, w = temp_frame.shape[:2]
|
h, w = swapped_frame.shape[:2]
|
||||||
x1, y1 = max(0, x1), max(0, y1)
|
x1, y1 = max(0, x1), max(0, y1)
|
||||||
x2, y2 = min(w, x2), min(h, y2)
|
x2, y2 = min(w, x2), min(h, y2)
|
||||||
|
|
||||||
if x2 <= x1 or y2 <= y1:
|
if x2 <= x1 or y2 <= y1:
|
||||||
return temp_frame
|
return swapped_frame
|
||||||
|
|
||||||
# Create face mask to handle occlusion
|
# Exclude forehead area (upper 25% of face) to avoid hair swapping
|
||||||
face_mask = create_enhanced_face_mask(target_face, temp_frame)
|
forehead_height = int((y2 - y1) * 0.25)
|
||||||
|
face_start_y = y1 + forehead_height
|
||||||
|
|
||||||
# Apply face swap
|
if face_start_y < y2:
|
||||||
swapped_frame = face_swapper.get(temp_frame, target_face, source_face, paste_back=True)
|
# Only blend the lower face area (eyes, nose, cheeks, chin)
|
||||||
|
swapped_face_area = swapped_frame[face_start_y:y2, x1:x2]
|
||||||
# Apply occlusion-aware blending
|
original_face_area = original_frame[face_start_y:y2, x1:x2]
|
||||||
final_frame = apply_occlusion_aware_blending(
|
|
||||||
swapped_frame, temp_frame, face_mask, bbox
|
|
||||||
)
|
|
||||||
|
|
||||||
# Enhanced post-processing for better quality
|
|
||||||
final_frame = enhance_face_swap_quality(final_frame, source_face, target_face, original_frame)
|
|
||||||
|
|
||||||
# Apply mouth mask if enabled
|
|
||||||
if modules.globals.mouth_mask:
|
|
||||||
face_mask_full = create_face_mask(target_face, final_frame)
|
|
||||||
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
|
|
||||||
create_lower_mouth_mask(target_face, final_frame)
|
|
||||||
)
|
|
||||||
final_frame = apply_mouth_area(
|
|
||||||
final_frame, mouth_cutout, mouth_box, face_mask_full, lower_lip_polygon
|
|
||||||
)
|
|
||||||
|
|
||||||
if modules.globals.show_mouth_mask_box:
|
# Create soft mask for the face area only
|
||||||
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
mask = np.ones(swapped_face_area.shape[:2], dtype=np.float32)
|
||||||
final_frame = draw_mouth_mask_visualization(
|
mask = cv2.GaussianBlur(mask, (15, 15), 5)
|
||||||
final_frame, target_face, mouth_mask_data
|
mask = mask[:, :, np.newaxis]
|
||||||
)
|
|
||||||
|
# Apply the face area back (keep original forehead/hair)
|
||||||
|
swapped_frame[face_start_y:y2, x1:x2] = swapped_face_area
|
||||||
|
|
||||||
return final_frame
|
return swapped_frame
|
||||||
|
|
||||||
except Exception as e:
|
except Exception:
|
||||||
print(f"Error in occlusion-aware face swap: {e}")
|
return swapped_frame
|
||||||
# Fallback to regular enhanced swap
|
|
||||||
return swap_face_enhanced(source_face, target_face, temp_frame)
|
|
||||||
|
|
||||||
|
|
||||||
def create_enhanced_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
def create_precise_face_mask(landmarks: np.ndarray, frame_shape: tuple) -> np.ndarray:
|
||||||
"""Create an enhanced face mask that better handles occlusion"""
|
"""Create precise mask for core facial features only (exclude forehead and hair)"""
|
||||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use landmarks if available for more precise masking
|
mask = np.zeros(frame_shape, dtype=np.uint8)
|
||||||
if hasattr(face, 'landmark_2d_106') and face.landmark_2d_106 is not None:
|
|
||||||
landmarks = face.landmark_2d_106.astype(np.int32)
|
# For 106-point landmarks, use correct indices
|
||||||
|
# Face contour (jawline) - points 0-32
|
||||||
# Create face contour from landmarks
|
jaw_line = landmarks[0:33]
|
||||||
face_contour = []
|
|
||||||
|
# Eyes area - approximate indices for 106-point model
|
||||||
# Face outline (jawline and forehead)
|
left_eye_area = landmarks[33:42] # Left eye region
|
||||||
face_outline_indices = list(range(0, 33)) # Jawline and face boundary
|
right_eye_area = landmarks[87:96] # Right eye region
|
||||||
for idx in face_outline_indices:
|
|
||||||
if idx < len(landmarks):
|
# Eyebrows (start from eyebrow level, not forehead)
|
||||||
face_contour.append(landmarks[idx])
|
left_eyebrow = landmarks[43:51] # Left eyebrow
|
||||||
|
right_eyebrow = landmarks[97:105] # Right eyebrow
|
||||||
if len(face_contour) > 3:
|
|
||||||
face_contour = np.array(face_contour)
|
# Create face contour that excludes forehead
|
||||||
|
# Start from eyebrow level and go around the face
|
||||||
# Create convex hull for smoother mask
|
face_contour_points = []
|
||||||
hull = cv2.convexHull(face_contour)
|
|
||||||
|
# Add eyebrow points (this will be our "top" instead of forehead)
|
||||||
# Expand the hull slightly for better coverage
|
face_contour_points.extend(left_eyebrow)
|
||||||
center = np.mean(hull, axis=0)
|
face_contour_points.extend(right_eyebrow)
|
||||||
expanded_hull = []
|
|
||||||
for point in hull:
|
# Add jawline points (bottom and sides of face)
|
||||||
direction = point[0] - center
|
face_contour_points.extend(jaw_line)
|
||||||
direction = direction / np.linalg.norm(direction) if np.linalg.norm(direction) > 0 else direction
|
|
||||||
expanded_point = point[0] + direction * 10 # Expand by 10 pixels
|
# Convert to numpy array
|
||||||
expanded_hull.append(expanded_point)
|
face_contour_points = np.array(face_contour_points)
|
||||||
|
|
||||||
expanded_hull = np.array(expanded_hull, dtype=np.int32)
|
# Create convex hull for the core face area (excluding forehead)
|
||||||
cv2.fillConvexPoly(mask, expanded_hull, 255)
|
hull = cv2.convexHull(face_contour_points)
|
||||||
else:
|
cv2.fillConvexPoly(mask, hull, 255)
|
||||||
# Fallback to bounding box
|
|
||||||
bbox = face.bbox.astype(int)
|
|
||||||
x1, y1, x2, y2 = bbox
|
|
||||||
cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
|
|
||||||
else:
|
|
||||||
# Fallback to bounding box if no landmarks
|
|
||||||
bbox = face.bbox.astype(int)
|
|
||||||
x1, y1, x2, y2 = bbox
|
|
||||||
cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
|
|
||||||
|
|
||||||
# Apply Gaussian blur for soft edges
|
# Apply Gaussian blur for soft edges
|
||||||
mask = cv2.GaussianBlur(mask, (15, 15), 5)
|
mask = cv2.GaussianBlur(mask, (21, 21), 7)
|
||||||
|
|
||||||
|
return mask
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error creating enhanced face mask: {e}")
|
print(f"Error creating precise face mask: {e}")
|
||||||
# Fallback to simple rectangle mask
|
return None
|
||||||
bbox = face.bbox.astype(int)
|
|
||||||
x1, y1, x2, y2 = bbox
|
|
||||||
cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
|
|
||||||
mask = cv2.GaussianBlur(mask, (15, 15), 5)
|
|
||||||
|
|
||||||
return mask
|
|
||||||
|
|
||||||
|
|
||||||
def apply_occlusion_aware_blending(swapped_frame: Frame, original_frame: Frame, face_mask: np.ndarray, bbox: np.ndarray) -> Frame:
|
|
||||||
"""Apply occlusion-aware blending to handle hands/objects covering the face"""
|
|
||||||
try:
|
|
||||||
x1, y1, x2, y2 = bbox
|
|
||||||
|
|
||||||
# Ensure coordinates are within bounds
|
|
||||||
h, w = swapped_frame.shape[:2]
|
|
||||||
x1, y1 = max(0, x1), max(0, y1)
|
|
||||||
x2, y2 = min(w, x2), min(h, y2)
|
|
||||||
|
|
||||||
if x2 <= x1 or y2 <= y1:
|
|
||||||
return swapped_frame
|
|
||||||
|
|
||||||
# Extract face regions
|
|
||||||
swapped_face_region = swapped_frame[y1:y2, x1:x2]
|
|
||||||
original_face_region = original_frame[y1:y2, x1:x2]
|
|
||||||
face_mask_region = face_mask[y1:y2, x1:x2]
|
|
||||||
|
|
||||||
# Detect potential occlusion using edge detection and color analysis
|
|
||||||
occlusion_mask = detect_occlusion(original_face_region, swapped_face_region)
|
|
||||||
|
|
||||||
# Combine face mask with occlusion detection
|
|
||||||
combined_mask = face_mask_region.astype(np.float32) / 255.0
|
|
||||||
occlusion_factor = (255 - occlusion_mask).astype(np.float32) / 255.0
|
|
||||||
|
|
||||||
# Apply occlusion-aware blending
|
|
||||||
final_mask = combined_mask * occlusion_factor
|
|
||||||
final_mask = final_mask[:, :, np.newaxis]
|
|
||||||
|
|
||||||
# Blend the regions
|
|
||||||
blended_region = (swapped_face_region * final_mask +
|
|
||||||
original_face_region * (1 - final_mask)).astype(np.uint8)
|
|
||||||
|
|
||||||
# Copy back to full frame
|
|
||||||
result_frame = swapped_frame.copy()
|
|
||||||
result_frame[y1:y2, x1:x2] = blended_region
|
|
||||||
|
|
||||||
return result_frame
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in occlusion-aware blending: {e}")
|
|
||||||
return swapped_frame
|
|
||||||
|
|
||||||
|
|
||||||
def detect_occlusion(original_region: np.ndarray, swapped_region: np.ndarray) -> np.ndarray:
|
|
||||||
"""Detect potential occlusion areas (hands, objects) in the face region"""
|
|
||||||
try:
|
|
||||||
# Convert to different color spaces for analysis
|
|
||||||
original_hsv = cv2.cvtColor(original_region, cv2.COLOR_BGR2HSV)
|
|
||||||
original_lab = cv2.cvtColor(original_region, cv2.COLOR_BGR2LAB)
|
|
||||||
|
|
||||||
# Detect skin-like regions (potential hands)
|
|
||||||
# HSV ranges for skin detection
|
|
||||||
lower_skin = np.array([0, 20, 70], dtype=np.uint8)
|
|
||||||
upper_skin = np.array([20, 255, 255], dtype=np.uint8)
|
|
||||||
skin_mask1 = cv2.inRange(original_hsv, lower_skin, upper_skin)
|
|
||||||
|
|
||||||
lower_skin2 = np.array([160, 20, 70], dtype=np.uint8)
|
|
||||||
upper_skin2 = np.array([180, 255, 255], dtype=np.uint8)
|
|
||||||
skin_mask2 = cv2.inRange(original_hsv, lower_skin2, upper_skin2)
|
|
||||||
|
|
||||||
skin_mask = cv2.bitwise_or(skin_mask1, skin_mask2)
|
|
||||||
|
|
||||||
# Edge detection to find object boundaries
|
|
||||||
gray = cv2.cvtColor(original_region, cv2.COLOR_BGR2GRAY)
|
|
||||||
edges = cv2.Canny(gray, 50, 150)
|
|
||||||
|
|
||||||
# Dilate edges to create thicker boundaries
|
|
||||||
kernel = np.ones((3, 3), np.uint8)
|
|
||||||
edges_dilated = cv2.dilate(edges, kernel, iterations=2)
|
|
||||||
|
|
||||||
# Combine skin detection and edge detection
|
|
||||||
occlusion_mask = cv2.bitwise_or(skin_mask, edges_dilated)
|
|
||||||
|
|
||||||
# Apply morphological operations to clean up the mask
|
|
||||||
kernel = np.ones((5, 5), np.uint8)
|
|
||||||
occlusion_mask = cv2.morphologyEx(occlusion_mask, cv2.MORPH_CLOSE, kernel)
|
|
||||||
occlusion_mask = cv2.morphologyEx(occlusion_mask, cv2.MORPH_OPEN, kernel)
|
|
||||||
|
|
||||||
# Apply Gaussian blur for smooth transitions
|
|
||||||
occlusion_mask = cv2.GaussianBlur(occlusion_mask, (11, 11), 3)
|
|
||||||
|
|
||||||
return occlusion_mask
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in occlusion detection: {e}")
|
|
||||||
# Return empty mask if detection fails
|
|
||||||
return np.zeros(original_region.shape[:2], dtype=np.uint8)
|
|
||||||
|
|
||||||
|
|
||||||
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
|
||||||
from modules.performance_optimizer import performance_optimizer
|
# Skip color correction for maximum FPS
|
||||||
from modules.face_tracker import face_tracker
|
# if modules.globals.color_correction:
|
||||||
|
# temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
||||||
start_time = time.time()
|
|
||||||
original_size = temp_frame.shape[:2][::-1] # (width, height)
|
|
||||||
|
|
||||||
# Apply color correction if enabled
|
|
||||||
if modules.globals.color_correction:
|
|
||||||
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
|
|
||||||
|
|
||||||
# Preprocess frame for performance
|
|
||||||
processed_frame = performance_optimizer.preprocess_frame(temp_frame)
|
|
||||||
|
|
||||||
if modules.globals.many_faces:
|
|
||||||
# Only detect faces if enough time has passed or cache is empty
|
|
||||||
if performance_optimizer.should_detect_faces():
|
|
||||||
detected_faces = get_many_faces(processed_frame)
|
|
||||||
# Apply tracking to each face
|
|
||||||
tracked_faces = []
|
|
||||||
for i, face in enumerate(detected_faces or []):
|
|
||||||
# Use separate tracker for each face (simplified for now)
|
|
||||||
tracked_face = face_tracker.track_face(face, processed_frame)
|
|
||||||
if tracked_face:
|
|
||||||
tracked_faces.append(tracked_face)
|
|
||||||
performance_optimizer.face_cache['many_faces'] = tracked_faces
|
|
||||||
else:
|
|
||||||
tracked_faces = performance_optimizer.face_cache.get('many_faces', [])
|
|
||||||
|
|
||||||
if tracked_faces:
|
|
||||||
for target_face in tracked_faces:
|
|
||||||
if source_face and target_face:
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(source_face, target_face, processed_frame, temp_frame)
|
|
||||||
else:
|
|
||||||
print("Face detection failed for target/source.")
|
|
||||||
else:
|
|
||||||
# Use cached face detection with tracking for better performance
|
|
||||||
if performance_optimizer.should_detect_faces():
|
|
||||||
detected_face = get_one_face(processed_frame)
|
|
||||||
tracked_face = face_tracker.track_face(detected_face, processed_frame)
|
|
||||||
performance_optimizer.face_cache['single_face'] = tracked_face
|
|
||||||
else:
|
|
||||||
tracked_face = performance_optimizer.face_cache.get('single_face')
|
|
||||||
|
|
||||||
if tracked_face and source_face:
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(source_face, tracked_face, processed_frame, temp_frame)
|
|
||||||
else:
|
|
||||||
# Try to use tracking even without detection
|
|
||||||
tracked_face = face_tracker.track_face(None, processed_frame)
|
|
||||||
if tracked_face and source_face:
|
|
||||||
processed_frame = swap_face_enhanced_with_occlusion(source_face, tracked_face, processed_frame, temp_frame)
|
|
||||||
else:
|
|
||||||
logging.error("Face detection and tracking failed.")
|
|
||||||
|
|
||||||
# Postprocess frame back to original size
|
|
||||||
final_frame = performance_optimizer.postprocess_frame(processed_frame, original_size)
|
|
||||||
|
|
||||||
# Update performance stats
|
|
||||||
frame_time = time.time() - start_time
|
|
||||||
performance_optimizer.update_fps_stats(frame_time)
|
|
||||||
|
|
||||||
return final_frame
|
|
||||||
|
|
||||||
|
if modules.globals.many_faces:
|
||||||
|
many_faces = get_many_faces(temp_frame)
|
||||||
|
if many_faces:
|
||||||
|
for target_face in many_faces:
|
||||||
|
if source_face and target_face:
|
||||||
|
temp_frame = swap_face_ultra_fast(source_face, target_face, temp_frame)
|
||||||
|
else:
|
||||||
|
target_face = get_one_face(temp_frame)
|
||||||
|
if target_face and source_face:
|
||||||
|
temp_frame = swap_face_ultra_fast(source_face, target_face, temp_frame)
|
||||||
|
return temp_frame
|
||||||
|
|
||||||
|
|
||||||
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
||||||
|
@ -638,7 +454,6 @@ def create_lower_mouth_mask(
|
||||||
mouth_cutout = None
|
mouth_cutout = None
|
||||||
landmarks = face.landmark_2d_106
|
landmarks = face.landmark_2d_106
|
||||||
if landmarks is not None:
|
if landmarks is not None:
|
||||||
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
|
||||||
lower_lip_order = [
|
lower_lip_order = [
|
||||||
65,
|
65,
|
||||||
66,
|
66,
|
||||||
|
@ -662,192 +477,74 @@ def create_lower_mouth_mask(
|
||||||
2,
|
2,
|
||||||
65,
|
65,
|
||||||
]
|
]
|
||||||
lower_lip_landmarks = landmarks[lower_lip_order].astype(
|
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
|
||||||
np.float32
|
|
||||||
) # Use float for precise calculations
|
|
||||||
|
|
||||||
# Calculate the center of the landmarks
|
|
||||||
center = np.mean(lower_lip_landmarks, axis=0)
|
center = np.mean(lower_lip_landmarks, axis=0)
|
||||||
|
expansion_factor = 1 + modules.globals.mask_down_size
|
||||||
# Expand the landmarks outward
|
|
||||||
expansion_factor = (
|
|
||||||
1 + modules.globals.mask_down_size
|
|
||||||
) # Adjust this for more or less expansion
|
|
||||||
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
|
expanded_landmarks = (lower_lip_landmarks - center) * expansion_factor + center
|
||||||
|
|
||||||
# Extend the top lip part
|
toplip_indices = [20, 0, 1, 2, 3, 4, 5]
|
||||||
toplip_indices = [
|
toplip_extension = modules.globals.mask_size * 0.5
|
||||||
20,
|
|
||||||
0,
|
|
||||||
1,
|
|
||||||
2,
|
|
||||||
3,
|
|
||||||
4,
|
|
||||||
5,
|
|
||||||
] # Indices for landmarks 2, 65, 66, 62, 70, 69, 18
|
|
||||||
toplip_extension = (
|
|
||||||
modules.globals.mask_size * 0.5
|
|
||||||
) # Adjust this factor to control the extension
|
|
||||||
for idx in toplip_indices:
|
for idx in toplip_indices:
|
||||||
direction = expanded_landmarks[idx] - center
|
direction = expanded_landmarks[idx] - center
|
||||||
direction = direction / np.linalg.norm(direction)
|
direction = direction / np.linalg.norm(direction)
|
||||||
expanded_landmarks[idx] += direction * toplip_extension
|
expanded_landmarks[idx] += direction * toplip_extension
|
||||||
|
|
||||||
# Extend the bottom part (chin area)
|
chin_indices = [11, 12, 13, 14, 15, 16]
|
||||||
chin_indices = [
|
chin_extension = 2 * 0.2
|
||||||
11,
|
|
||||||
12,
|
|
||||||
13,
|
|
||||||
14,
|
|
||||||
15,
|
|
||||||
16,
|
|
||||||
] # Indices for landmarks 21, 22, 23, 24, 0, 8
|
|
||||||
chin_extension = 2 * 0.2 # Adjust this factor to control the extension
|
|
||||||
for idx in chin_indices:
|
for idx in chin_indices:
|
||||||
expanded_landmarks[idx][1] += (
|
expanded_landmarks[idx][1] += (
|
||||||
expanded_landmarks[idx][1] - center[1]
|
expanded_landmarks[idx][1] - center[1]
|
||||||
) * chin_extension
|
) * chin_extension
|
||||||
|
|
||||||
# Convert back to integer coordinates
|
|
||||||
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
||||||
|
|
||||||
# Calculate bounding box for the expanded lower mouth
|
|
||||||
min_x, min_y = np.min(expanded_landmarks, axis=0)
|
min_x, min_y = np.min(expanded_landmarks, axis=0)
|
||||||
max_x, max_y = np.max(expanded_landmarks, axis=0)
|
max_x, max_y = np.max(expanded_landmarks, axis=0)
|
||||||
|
|
||||||
# Add some padding to the bounding box
|
padding = int((max_x - min_x) * 0.1)
|
||||||
padding = int((max_x - min_x) * 0.1) # 10% padding
|
|
||||||
min_x = max(0, min_x - padding)
|
min_x = max(0, min_x - padding)
|
||||||
min_y = max(0, min_y - padding)
|
min_y = max(0, min_y - padding)
|
||||||
max_x = min(frame.shape[1], max_x + padding)
|
max_x = min(frame.shape[1], max_x + padding)
|
||||||
max_y = min(frame.shape[0], max_y + padding)
|
max_y = min(frame.shape[0], max_y + padding)
|
||||||
|
|
||||||
# Ensure the bounding box dimensions are valid
|
|
||||||
if max_x <= min_x or max_y <= min_y:
|
if max_x <= min_x or max_y <= min_y:
|
||||||
if (max_x - min_x) <= 1:
|
if (max_x - min_x) <= 1:
|
||||||
max_x = min_x + 1
|
max_x = min_x + 1
|
||||||
if (max_y - min_y) <= 1:
|
if (max_y - min_y) <= 1:
|
||||||
max_y = min_y + 1
|
max_y = min_y + 1
|
||||||
|
|
||||||
# Create the mask
|
|
||||||
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
|
mask_roi = np.zeros((max_y - min_y, max_x - min_x), dtype=np.uint8)
|
||||||
cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255)
|
cv2.fillPoly(mask_roi, [expanded_landmarks - [min_x, min_y]], 255)
|
||||||
|
# Improved smoothing for mouth mask
|
||||||
# Apply Gaussian blur to soften the mask edges
|
mask_roi = cv2.GaussianBlur(mask_roi, (25, 25), 8)
|
||||||
mask_roi = cv2.GaussianBlur(mask_roi, (15, 15), 5)
|
|
||||||
|
|
||||||
# Place the mask ROI in the full-sized mask
|
|
||||||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||||||
|
|
||||||
# Extract the masked area from the frame
|
|
||||||
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
||||||
|
|
||||||
# Return the expanded lower lip polygon in original frame coordinates
|
|
||||||
lower_lip_polygon = expanded_landmarks
|
lower_lip_polygon = expanded_landmarks
|
||||||
|
|
||||||
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
|
return mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon
|
||||||
|
|
||||||
|
|
||||||
def draw_mouth_mask_visualization(
|
def draw_mouth_mask_visualization(frame: Frame, face: Face, mouth_mask_data: tuple) -> Frame:
|
||||||
frame: Frame, face: Face, mouth_mask_data: tuple
|
|
||||||
) -> Frame:
|
|
||||||
landmarks = face.landmark_2d_106
|
landmarks = face.landmark_2d_106
|
||||||
if landmarks is not None and mouth_mask_data is not None:
|
if landmarks is not None and mouth_mask_data is not None:
|
||||||
mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = (
|
mask, mouth_cutout, (min_x, min_y, max_x, max_y), lower_lip_polygon = mouth_mask_data
|
||||||
mouth_mask_data
|
|
||||||
)
|
|
||||||
|
|
||||||
vis_frame = frame.copy()
|
vis_frame = frame.copy()
|
||||||
|
|
||||||
# Ensure coordinates are within frame bounds
|
|
||||||
height, width = vis_frame.shape[:2]
|
height, width = vis_frame.shape[:2]
|
||||||
min_x, min_y = max(0, min_x), max(0, min_y)
|
min_x, min_y = max(0, min_x), max(0, min_y)
|
||||||
max_x, max_y = min(width, max_x), min(height, max_y)
|
max_x, max_y = min(width, max_x), min(height, max_y)
|
||||||
|
|
||||||
# Adjust mask to match the region size
|
|
||||||
mask_region = mask[0 : max_y - min_y, 0 : max_x - min_x]
|
|
||||||
|
|
||||||
# Remove the color mask overlay
|
|
||||||
# color_mask = cv2.applyColorMap((mask_region * 255).astype(np.uint8), cv2.COLORMAP_JET)
|
|
||||||
|
|
||||||
# Ensure shapes match before blending
|
|
||||||
vis_region = vis_frame[min_y:max_y, min_x:max_x]
|
|
||||||
# Remove blending with color_mask
|
|
||||||
# if vis_region.shape[:2] == color_mask.shape[:2]:
|
|
||||||
# blended = cv2.addWeighted(vis_region, 0.7, color_mask, 0.3, 0)
|
|
||||||
# vis_frame[min_y:max_y, min_x:max_x] = blended
|
|
||||||
|
|
||||||
# Draw the lower lip polygon
|
|
||||||
cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2)
|
cv2.polylines(vis_frame, [lower_lip_polygon], True, (0, 255, 0), 2)
|
||||||
|
cv2.putText(vis_frame, "Lower Mouth Mask", (min_x, min_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
||||||
# Remove the red box
|
|
||||||
# cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
|
|
||||||
|
|
||||||
# Visualize the feathered mask
|
|
||||||
feather_amount = max(
|
|
||||||
1,
|
|
||||||
min(
|
|
||||||
30,
|
|
||||||
(max_x - min_x) // modules.globals.mask_feather_ratio,
|
|
||||||
(max_y - min_y) // modules.globals.mask_feather_ratio,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
# Ensure kernel size is odd
|
|
||||||
kernel_size = 2 * feather_amount + 1
|
|
||||||
feathered_mask = cv2.GaussianBlur(
|
|
||||||
mask_region.astype(float), (kernel_size, kernel_size), 0
|
|
||||||
)
|
|
||||||
feathered_mask = (feathered_mask / feathered_mask.max() * 255).astype(np.uint8)
|
|
||||||
# Remove the feathered mask color overlay
|
|
||||||
# color_feathered_mask = cv2.applyColorMap(feathered_mask, cv2.COLORMAP_VIRIDIS)
|
|
||||||
|
|
||||||
# Ensure shapes match before blending feathered mask
|
|
||||||
# if vis_region.shape == color_feathered_mask.shape:
|
|
||||||
# blended_feathered = cv2.addWeighted(vis_region, 0.7, color_feathered_mask, 0.3, 0)
|
|
||||||
# vis_frame[min_y:max_y, min_x:max_x] = blended_feathered
|
|
||||||
|
|
||||||
# Add labels
|
|
||||||
cv2.putText(
|
|
||||||
vis_frame,
|
|
||||||
"Lower Mouth Mask",
|
|
||||||
(min_x, min_y - 10),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX,
|
|
||||||
0.5,
|
|
||||||
(255, 255, 255),
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
cv2.putText(
|
|
||||||
vis_frame,
|
|
||||||
"Feathered Mask",
|
|
||||||
(min_x, max_y + 20),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX,
|
|
||||||
0.5,
|
|
||||||
(255, 255, 255),
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
|
|
||||||
return vis_frame
|
return vis_frame
|
||||||
return frame
|
return frame
|
||||||
|
|
||||||
|
|
||||||
def apply_mouth_area(
|
def apply_mouth_area(frame: np.ndarray, mouth_cutout: np.ndarray, mouth_box: tuple, face_mask: np.ndarray, mouth_polygon: np.ndarray) -> np.ndarray:
|
||||||
frame: np.ndarray,
|
|
||||||
mouth_cutout: np.ndarray,
|
|
||||||
mouth_box: tuple,
|
|
||||||
face_mask: np.ndarray,
|
|
||||||
mouth_polygon: np.ndarray,
|
|
||||||
) -> np.ndarray:
|
|
||||||
min_x, min_y, max_x, max_y = mouth_box
|
min_x, min_y, max_x, max_y = mouth_box
|
||||||
box_width = max_x - min_x
|
box_width = max_x - min_x
|
||||||
box_height = max_y - min_y
|
box_height = max_y - min_y
|
||||||
|
|
||||||
if (
|
if mouth_cutout is None or box_width is None or box_height is None or face_mask is None or mouth_polygon is None:
|
||||||
mouth_cutout is None
|
|
||||||
or box_width is None
|
|
||||||
or box_height is None
|
|
||||||
or face_mask is None
|
|
||||||
or mouth_polygon is None
|
|
||||||
):
|
|
||||||
return frame
|
return frame
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -855,44 +552,33 @@ def apply_mouth_area(
|
||||||
roi = frame[min_y:max_y, min_x:max_x]
|
roi = frame[min_y:max_y, min_x:max_x]
|
||||||
|
|
||||||
if roi.shape != resized_mouth_cutout.shape:
|
if roi.shape != resized_mouth_cutout.shape:
|
||||||
resized_mouth_cutout = cv2.resize(
|
resized_mouth_cutout = cv2.resize(resized_mouth_cutout, (roi.shape[1], roi.shape[0]))
|
||||||
resized_mouth_cutout, (roi.shape[1], roi.shape[0])
|
|
||||||
)
|
|
||||||
|
|
||||||
color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi)
|
color_corrected_mouth = apply_color_transfer(resized_mouth_cutout, roi)
|
||||||
|
|
||||||
# Use the provided mouth polygon to create the mask
|
|
||||||
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
|
polygon_mask = np.zeros(roi.shape[:2], dtype=np.uint8)
|
||||||
adjusted_polygon = mouth_polygon - [min_x, min_y]
|
adjusted_polygon = mouth_polygon - [min_x, min_y]
|
||||||
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
|
cv2.fillPoly(polygon_mask, [adjusted_polygon], 255)
|
||||||
|
|
||||||
# Apply feathering to the polygon mask
|
# Improved feathering for smoother mouth mask
|
||||||
feather_amount = min(
|
feather_amount = min(35, box_width // modules.globals.mask_feather_ratio, box_height // modules.globals.mask_feather_ratio)
|
||||||
30,
|
feathered_mask = cv2.GaussianBlur(polygon_mask.astype(float), (0, 0), feather_amount * 1.2)
|
||||||
box_width // modules.globals.mask_feather_ratio,
|
|
||||||
box_height // modules.globals.mask_feather_ratio,
|
|
||||||
)
|
|
||||||
feathered_mask = cv2.GaussianBlur(
|
|
||||||
polygon_mask.astype(float), (0, 0), feather_amount
|
|
||||||
)
|
|
||||||
feathered_mask = feathered_mask / feathered_mask.max()
|
feathered_mask = feathered_mask / feathered_mask.max()
|
||||||
|
|
||||||
|
# Additional smoothing pass for extra softness
|
||||||
|
feathered_mask = cv2.GaussianBlur(feathered_mask, (7, 7), 2)
|
||||||
|
|
||||||
|
# Fix black line artifacts by ensuring smooth mask transitions
|
||||||
|
feathered_mask = np.clip(feathered_mask, 0.1, 0.9) # Avoid pure 0 and 1 values
|
||||||
|
|
||||||
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
|
face_mask_roi = face_mask[min_y:max_y, min_x:max_x]
|
||||||
combined_mask = feathered_mask * (face_mask_roi / 255.0)
|
combined_mask = feathered_mask * (face_mask_roi / 255.0)
|
||||||
|
|
||||||
combined_mask = combined_mask[:, :, np.newaxis]
|
combined_mask = combined_mask[:, :, np.newaxis]
|
||||||
blended = (
|
blended = (color_corrected_mouth * combined_mask + roi * (1 - combined_mask)).astype(np.uint8)
|
||||||
color_corrected_mouth * combined_mask + roi * (1 - combined_mask)
|
|
||||||
).astype(np.uint8)
|
|
||||||
|
|
||||||
# Apply face mask to blended result
|
face_mask_3channel = np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
|
||||||
face_mask_3channel = (
|
|
||||||
np.repeat(face_mask_roi[:, :, np.newaxis], 3, axis=2) / 255.0
|
|
||||||
)
|
|
||||||
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
|
final_blend = blended * face_mask_3channel + roi * (1 - face_mask_3channel)
|
||||||
|
|
||||||
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
|
frame[min_y:max_y, min_x:max_x] = final_blend.astype(np.uint8)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return frame
|
return frame
|
||||||
|
@ -902,10 +588,7 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
||||||
landmarks = face.landmark_2d_106
|
landmarks = face.landmark_2d_106
|
||||||
if landmarks is not None:
|
if landmarks is not None:
|
||||||
# Convert landmarks to int32
|
|
||||||
landmarks = landmarks.astype(np.int32)
|
landmarks = landmarks.astype(np.int32)
|
||||||
|
|
||||||
# Extract facial features
|
|
||||||
right_side_face = landmarks[0:16]
|
right_side_face = landmarks[0:16]
|
||||||
left_side_face = landmarks[17:32]
|
left_side_face = landmarks[17:32]
|
||||||
right_eye = landmarks[33:42]
|
right_eye = landmarks[33:42]
|
||||||
|
@ -913,39 +596,22 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||||
left_eye = landmarks[87:96]
|
left_eye = landmarks[87:96]
|
||||||
left_eye_brow = landmarks[97:105]
|
left_eye_brow = landmarks[97:105]
|
||||||
|
|
||||||
# Calculate forehead extension
|
|
||||||
right_eyebrow_top = np.min(right_eye_brow[:, 1])
|
right_eyebrow_top = np.min(right_eye_brow[:, 1])
|
||||||
left_eyebrow_top = np.min(left_eye_brow[:, 1])
|
left_eyebrow_top = np.min(left_eye_brow[:, 1])
|
||||||
eyebrow_top = min(right_eyebrow_top, left_eyebrow_top)
|
eyebrow_top = min(right_eyebrow_top, left_eyebrow_top)
|
||||||
|
|
||||||
face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]])
|
face_top = np.min([right_side_face[0, 1], left_side_face[-1, 1]])
|
||||||
forehead_height = face_top - eyebrow_top
|
forehead_height = face_top - eyebrow_top
|
||||||
extended_forehead_height = int(forehead_height * 5.0) # Extend by 50%
|
extended_forehead_height = int(forehead_height * 5.0)
|
||||||
|
|
||||||
# Create forehead points
|
|
||||||
forehead_left = right_side_face[0].copy()
|
forehead_left = right_side_face[0].copy()
|
||||||
forehead_right = left_side_face[-1].copy()
|
forehead_right = left_side_face[-1].copy()
|
||||||
forehead_left[1] -= extended_forehead_height
|
forehead_left[1] -= extended_forehead_height
|
||||||
forehead_right[1] -= extended_forehead_height
|
forehead_right[1] -= extended_forehead_height
|
||||||
|
|
||||||
# Combine all points to create the face outline
|
face_outline = np.vstack([[forehead_left], right_side_face, left_side_face[::-1], [forehead_right]])
|
||||||
face_outline = np.vstack(
|
padding = int(np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05)
|
||||||
[
|
|
||||||
[forehead_left],
|
|
||||||
right_side_face,
|
|
||||||
left_side_face[
|
|
||||||
::-1
|
|
||||||
], # Reverse left side to create a continuous outline
|
|
||||||
[forehead_right],
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Calculate padding
|
|
||||||
padding = int(
|
|
||||||
np.linalg.norm(right_side_face[0] - left_side_face[-1]) * 0.05
|
|
||||||
) # 5% of face width
|
|
||||||
|
|
||||||
# Create a slightly larger convex hull for padding
|
|
||||||
hull = cv2.convexHull(face_outline)
|
hull = cv2.convexHull(face_outline)
|
||||||
hull_padded = []
|
hull_padded = []
|
||||||
for point in hull:
|
for point in hull:
|
||||||
|
@ -957,33 +623,23 @@ def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||||||
hull_padded.append(padded_point)
|
hull_padded.append(padded_point)
|
||||||
|
|
||||||
hull_padded = np.array(hull_padded, dtype=np.int32)
|
hull_padded = np.array(hull_padded, dtype=np.int32)
|
||||||
|
|
||||||
# Fill the padded convex hull
|
|
||||||
cv2.fillConvexPoly(mask, hull_padded, 255)
|
cv2.fillConvexPoly(mask, hull_padded, 255)
|
||||||
|
|
||||||
# Smooth the mask edges
|
|
||||||
mask = cv2.GaussianBlur(mask, (5, 5), 3)
|
mask = cv2.GaussianBlur(mask, (5, 5), 3)
|
||||||
|
|
||||||
return mask
|
return mask
|
||||||
|
|
||||||
|
|
||||||
def apply_color_transfer(source, target):
|
def apply_color_transfer(source, target):
|
||||||
"""
|
|
||||||
Apply color transfer from target to source image
|
|
||||||
"""
|
|
||||||
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
|
source = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype("float32")
|
||||||
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
|
target = cv2.cvtColor(target, cv2.COLOR_BGR2LAB).astype("float32")
|
||||||
|
|
||||||
source_mean, source_std = cv2.meanStdDev(source)
|
source_mean, source_std = cv2.meanStdDev(source)
|
||||||
target_mean, target_std = cv2.meanStdDev(target)
|
target_mean, target_std = cv2.meanStdDev(target)
|
||||||
|
|
||||||
# Reshape mean and std to be broadcastable
|
|
||||||
source_mean = source_mean.reshape(1, 1, 3)
|
source_mean = source_mean.reshape(1, 1, 3)
|
||||||
source_std = source_std.reshape(1, 1, 3)
|
source_std = source_std.reshape(1, 1, 3)
|
||||||
target_mean = target_mean.reshape(1, 1, 3)
|
target_mean = target_mean.reshape(1, 1, 3)
|
||||||
target_std = target_std.reshape(1, 1, 3)
|
target_std = target_std.reshape(1, 1, 3)
|
||||||
|
|
||||||
# Perform the color transfer
|
|
||||||
source = (source - source_mean) * (target_std / source_std) + target_mean
|
source = (source - source_mean) * (target_std / source_std) + target_mean
|
||||||
|
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
||||||
return cv2.cvtColor(np.clip(source, 0, 255).astype("uint8"), cv2.COLOR_LAB2BGR)
|
|
|
@ -3,8 +3,6 @@ import numpy as np
|
||||||
from typing import Optional, Tuple, Callable
|
from typing import Optional, Tuple, Callable
|
||||||
import platform
|
import platform
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
from collections import deque
|
|
||||||
|
|
||||||
# Only import Windows-specific library if on Windows
|
# Only import Windows-specific library if on Windows
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
|
@ -19,17 +17,6 @@ class VideoCapturer:
|
||||||
self._frame_ready = threading.Event()
|
self._frame_ready = threading.Event()
|
||||||
self.is_running = False
|
self.is_running = False
|
||||||
self.cap = None
|
self.cap = None
|
||||||
|
|
||||||
# Performance tracking
|
|
||||||
self.frame_times = deque(maxlen=30)
|
|
||||||
self.current_fps = 0
|
|
||||||
self.target_fps = 30
|
|
||||||
self.frame_skip = 1
|
|
||||||
self.frame_counter = 0
|
|
||||||
|
|
||||||
# Buffer management
|
|
||||||
self.frame_buffer = deque(maxlen=3)
|
|
||||||
self.buffer_lock = threading.Lock()
|
|
||||||
|
|
||||||
# Initialize Windows-specific components if on Windows
|
# Initialize Windows-specific components if on Windows
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
|
@ -42,10 +29,8 @@ class VideoCapturer:
|
||||||
)
|
)
|
||||||
|
|
||||||
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
||||||
"""Initialize and start video capture with performance optimizations"""
|
"""Initialize and start video capture"""
|
||||||
try:
|
try:
|
||||||
self.target_fps = fps
|
|
||||||
|
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
# Windows-specific capture methods
|
# Windows-specific capture methods
|
||||||
capture_methods = [
|
capture_methods = [
|
||||||
|
@ -70,14 +55,10 @@ class VideoCapturer:
|
||||||
if not self.cap or not self.cap.isOpened():
|
if not self.cap or not self.cap.isOpened():
|
||||||
raise RuntimeError("Failed to open camera")
|
raise RuntimeError("Failed to open camera")
|
||||||
|
|
||||||
# Configure format with performance optimizations
|
# Configure format
|
||||||
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||||
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
||||||
|
|
||||||
# Additional performance settings
|
|
||||||
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to minimize latency
|
|
||||||
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # Use MJPEG for better performance
|
|
||||||
|
|
||||||
self.is_running = True
|
self.is_running = True
|
||||||
return True
|
return True
|
||||||
|
@ -89,57 +70,17 @@ class VideoCapturer:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
||||||
"""Read a frame from the camera with performance optimizations"""
|
"""Read a frame from the camera"""
|
||||||
if not self.is_running or self.cap is None:
|
if not self.is_running or self.cap is None:
|
||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
# Implement frame skipping for performance
|
|
||||||
self.frame_counter += 1
|
|
||||||
if self.frame_counter % self.frame_skip != 0:
|
|
||||||
# Skip this frame but still read to clear buffer
|
|
||||||
ret, _ = self.cap.read()
|
|
||||||
return ret, self._current_frame if ret else None
|
|
||||||
|
|
||||||
ret, frame = self.cap.read()
|
ret, frame = self.cap.read()
|
||||||
if ret:
|
if ret:
|
||||||
self._current_frame = frame
|
self._current_frame = frame
|
||||||
|
|
||||||
# Update performance metrics
|
|
||||||
frame_time = time.time() - start_time
|
|
||||||
self.frame_times.append(frame_time)
|
|
||||||
self._update_performance_metrics()
|
|
||||||
|
|
||||||
# Add to buffer for processing
|
|
||||||
with self.buffer_lock:
|
|
||||||
self.frame_buffer.append(frame.copy())
|
|
||||||
|
|
||||||
if self.frame_callback:
|
if self.frame_callback:
|
||||||
self.frame_callback(frame)
|
self.frame_callback(frame)
|
||||||
return True, frame
|
return True, frame
|
||||||
return False, None
|
return False, None
|
||||||
|
|
||||||
def _update_performance_metrics(self):
|
|
||||||
"""Update FPS and adjust frame skipping based on performance"""
|
|
||||||
if len(self.frame_times) >= 10:
|
|
||||||
avg_frame_time = sum(list(self.frame_times)[-10:]) / 10
|
|
||||||
self.current_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
|
|
||||||
|
|
||||||
# Adaptive frame skipping
|
|
||||||
if self.current_fps < self.target_fps * 0.8:
|
|
||||||
self.frame_skip = min(3, self.frame_skip + 1)
|
|
||||||
elif self.current_fps > self.target_fps * 0.95:
|
|
||||||
self.frame_skip = max(1, self.frame_skip - 1)
|
|
||||||
|
|
||||||
def get_buffered_frame(self) -> Optional[np.ndarray]:
|
|
||||||
"""Get the latest frame from buffer"""
|
|
||||||
with self.buffer_lock:
|
|
||||||
return self.frame_buffer[-1] if self.frame_buffer else None
|
|
||||||
|
|
||||||
def get_fps(self) -> float:
|
|
||||||
"""Get current FPS"""
|
|
||||||
return self.current_fps
|
|
||||||
|
|
||||||
def release(self) -> None:
|
def release(self) -> None:
|
||||||
"""Stop capture and release resources"""
|
"""Stop capture and release resources"""
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
{
|
|
||||||
"performance_modes": {
|
|
||||||
"fast": {
|
|
||||||
"quality_level": 0.6,
|
|
||||||
"face_detection_interval": 0.2,
|
|
||||||
"target_fps": 30,
|
|
||||||
"frame_skip": 2,
|
|
||||||
"enable_caching": true,
|
|
||||||
"processing_resolution_scale": 0.7,
|
|
||||||
"description": "Optimized for maximum FPS with acceptable quality"
|
|
||||||
},
|
|
||||||
"balanced": {
|
|
||||||
"quality_level": 0.85,
|
|
||||||
"face_detection_interval": 0.1,
|
|
||||||
"target_fps": 25,
|
|
||||||
"frame_skip": 1,
|
|
||||||
"enable_caching": true,
|
|
||||||
"processing_resolution_scale": 0.85,
|
|
||||||
"description": "Balance between quality and performance"
|
|
||||||
},
|
|
||||||
"quality": {
|
|
||||||
"quality_level": 1.0,
|
|
||||||
"face_detection_interval": 0.05,
|
|
||||||
"target_fps": 20,
|
|
||||||
"frame_skip": 1,
|
|
||||||
"enable_caching": false,
|
|
||||||
"processing_resolution_scale": 1.0,
|
|
||||||
"description": "Maximum quality with slower processing"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"advanced_settings": {
|
|
||||||
"color_matching_strength": 0.7,
|
|
||||||
"edge_smoothing_enabled": true,
|
|
||||||
"adaptive_quality_enabled": true,
|
|
||||||
"gpu_memory_optimization": true,
|
|
||||||
"face_cache_size": 10,
|
|
||||||
"frame_buffer_size": 3
|
|
||||||
},
|
|
||||||
"quality_enhancements": {
|
|
||||||
"enable_color_correction": true,
|
|
||||||
"enable_edge_smoothing": true,
|
|
||||||
"enable_advanced_blending": true,
|
|
||||||
"skin_tone_matching": true,
|
|
||||||
"lighting_adaptation": true
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,120 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Deep-Live-Cam Performance Setup Script
|
|
||||||
Easy configuration for optimal performance based on your hardware
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
||||||
|
|
||||||
from modules.performance_manager import performance_manager
|
|
||||||
import psutil
|
|
||||||
import platform
|
|
||||||
|
|
||||||
def print_header():
|
|
||||||
print("=" * 60)
|
|
||||||
print("🎭 Deep-Live-Cam Performance Optimizer")
|
|
||||||
print("=" * 60)
|
|
||||||
print()
|
|
||||||
|
|
||||||
def analyze_system():
|
|
||||||
"""Analyze system specifications"""
|
|
||||||
print("📊 Analyzing your system...")
|
|
||||||
print("-" * 40)
|
|
||||||
|
|
||||||
# System info
|
|
||||||
print(f"OS: {platform.system()} {platform.release()}")
|
|
||||||
print(f"CPU: {platform.processor()}")
|
|
||||||
print(f"CPU Cores: {psutil.cpu_count()}")
|
|
||||||
print(f"RAM: {psutil.virtual_memory().total / (1024**3):.1f} GB")
|
|
||||||
|
|
||||||
# GPU info
|
|
||||||
try:
|
|
||||||
import torch
|
|
||||||
if torch.cuda.is_available():
|
|
||||||
gpu_name = torch.cuda.get_device_name(0)
|
|
||||||
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
|
|
||||||
print(f"GPU: {gpu_name} ({gpu_memory:.1f} GB)")
|
|
||||||
else:
|
|
||||||
print("GPU: Not available or not CUDA-compatible")
|
|
||||||
except ImportError:
|
|
||||||
print("GPU: PyTorch not available")
|
|
||||||
|
|
||||||
print()
|
|
||||||
|
|
||||||
def show_performance_modes():
|
|
||||||
"""Display available performance modes"""
|
|
||||||
print("🎯 Available Performance Modes:")
|
|
||||||
print("-" * 40)
|
|
||||||
|
|
||||||
modes = performance_manager.get_all_modes()
|
|
||||||
for mode_name, mode_config in modes.items():
|
|
||||||
print(f"\n{mode_name.upper()}:")
|
|
||||||
print(f" Quality Level: {mode_config['quality_level']}")
|
|
||||||
print(f" Target FPS: {mode_config['target_fps']}")
|
|
||||||
print(f" Detection Interval: {mode_config['face_detection_interval']}s")
|
|
||||||
if 'description' in mode_config:
|
|
||||||
print(f" Description: {mode_config['description']}")
|
|
||||||
|
|
||||||
def interactive_setup():
|
|
||||||
"""Interactive performance setup"""
|
|
||||||
print("🛠️ Interactive Setup:")
|
|
||||||
print("-" * 40)
|
|
||||||
|
|
||||||
print("\nChoose your priority:")
|
|
||||||
print("1. Maximum FPS (for live streaming)")
|
|
||||||
print("2. Balanced performance and quality")
|
|
||||||
print("3. Best quality (for video processing)")
|
|
||||||
print("4. Auto-optimize based on hardware")
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
choice = input("\nEnter your choice (1-4): ").strip()
|
|
||||||
|
|
||||||
if choice == "1":
|
|
||||||
performance_manager.set_performance_mode("fast")
|
|
||||||
print("✅ Set to FAST mode - Maximum FPS")
|
|
||||||
break
|
|
||||||
elif choice == "2":
|
|
||||||
performance_manager.set_performance_mode("balanced")
|
|
||||||
print("✅ Set to BALANCED mode - Good balance")
|
|
||||||
break
|
|
||||||
elif choice == "3":
|
|
||||||
performance_manager.set_performance_mode("quality")
|
|
||||||
print("✅ Set to QUALITY mode - Best results")
|
|
||||||
break
|
|
||||||
elif choice == "4":
|
|
||||||
optimal_mode = performance_manager.optimize_for_hardware()
|
|
||||||
print(f"✅ Auto-optimized to {optimal_mode.upper()} mode")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
print("❌ Invalid choice. Please enter 1, 2, 3, or 4.")
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("\n\n👋 Setup cancelled.")
|
|
||||||
return
|
|
||||||
|
|
||||||
def show_tips():
|
|
||||||
"""Show performance tips"""
|
|
||||||
print("\n💡 Performance Tips:")
|
|
||||||
print("-" * 40)
|
|
||||||
|
|
||||||
tips = performance_manager.get_performance_tips()
|
|
||||||
for tip in tips:
|
|
||||||
print(f" {tip}")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
print_header()
|
|
||||||
analyze_system()
|
|
||||||
show_performance_modes()
|
|
||||||
interactive_setup()
|
|
||||||
show_tips()
|
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
|
||||||
print("🎉 Setup complete! You can change these settings anytime by running this script again.")
|
|
||||||
print("💻 Start Deep-Live-Cam with: python run.py")
|
|
||||||
print("=" * 60)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,167 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Test script for the new KIRO improvements
|
|
||||||
Demonstrates face tracking, occlusion handling, and stabilization
|
|
||||||
"""
|
|
||||||
|
|
||||||
import cv2
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
||||||
|
|
||||||
from modules.live_face_swapper import live_face_swapper
|
|
||||||
from modules.performance_manager import performance_manager
|
|
||||||
from modules.face_tracker import face_tracker
|
|
||||||
import modules.globals
|
|
||||||
|
|
||||||
def test_live_face_swap():
|
|
||||||
"""Test the enhanced live face swapping with new features"""
|
|
||||||
print("🎭 Testing Enhanced Live Face Swapping")
|
|
||||||
print("=" * 50)
|
|
||||||
|
|
||||||
# Set performance mode
|
|
||||||
print("Setting performance mode to 'balanced'...")
|
|
||||||
performance_manager.set_performance_mode("balanced")
|
|
||||||
|
|
||||||
# Get source image path
|
|
||||||
source_path = input("Enter path to source face image (or press Enter for demo): ").strip()
|
|
||||||
if not source_path:
|
|
||||||
print("Please provide a source image path to test face swapping.")
|
|
||||||
return
|
|
||||||
|
|
||||||
if not os.path.exists(source_path):
|
|
||||||
print(f"Source image not found: {source_path}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Set source face
|
|
||||||
print("Loading source face...")
|
|
||||||
if not live_face_swapper.set_source_face(source_path):
|
|
||||||
print("❌ Failed to detect face in source image")
|
|
||||||
return
|
|
||||||
|
|
||||||
print("✅ Source face loaded successfully")
|
|
||||||
|
|
||||||
# Display callback function
|
|
||||||
def display_frame(frame, fps):
|
|
||||||
# Add FPS text to frame
|
|
||||||
cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
|
||||||
|
|
||||||
# Add tracking status
|
|
||||||
if face_tracker.is_face_stable():
|
|
||||||
status_text = "TRACKING: STABLE"
|
|
||||||
color = (0, 255, 0)
|
|
||||||
else:
|
|
||||||
status_text = "TRACKING: SEARCHING"
|
|
||||||
color = (0, 255, 255)
|
|
||||||
|
|
||||||
cv2.putText(frame, status_text, (10, 70),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
|
||||||
|
|
||||||
# Add performance info
|
|
||||||
stats = live_face_swapper.get_performance_stats()
|
|
||||||
quality_text = f"Quality: {stats['quality_level']:.1f}"
|
|
||||||
cv2.putText(frame, quality_text, (10, 110),
|
|
||||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
|
|
||||||
|
|
||||||
# Show frame
|
|
||||||
cv2.imshow("Enhanced Live Face Swap - KIRO Improvements", frame)
|
|
||||||
|
|
||||||
# Handle key presses
|
|
||||||
key = cv2.waitKey(1) & 0xFF
|
|
||||||
if key == ord('q'):
|
|
||||||
live_face_swapper.stop_live_swap()
|
|
||||||
elif key == ord('f'): # Fast mode
|
|
||||||
performance_manager.set_performance_mode("fast")
|
|
||||||
print("Switched to FAST mode")
|
|
||||||
elif key == ord('b'): # Balanced mode
|
|
||||||
performance_manager.set_performance_mode("balanced")
|
|
||||||
print("Switched to BALANCED mode")
|
|
||||||
elif key == ord('h'): # Quality mode
|
|
||||||
performance_manager.set_performance_mode("quality")
|
|
||||||
print("Switched to QUALITY mode")
|
|
||||||
elif key == ord('r'): # Reset tracking
|
|
||||||
face_tracker.reset_tracking()
|
|
||||||
print("Reset face tracking")
|
|
||||||
|
|
||||||
print("\n🎥 Starting live face swap...")
|
|
||||||
print("Controls:")
|
|
||||||
print(" Q - Quit")
|
|
||||||
print(" F - Fast mode")
|
|
||||||
print(" B - Balanced mode")
|
|
||||||
print(" H - High quality mode")
|
|
||||||
print(" R - Reset tracking")
|
|
||||||
print("\n✨ New Features:")
|
|
||||||
print(" - Face tracking with occlusion handling")
|
|
||||||
print(" - Stabilized face swapping (less jittery)")
|
|
||||||
print(" - Adaptive performance optimization")
|
|
||||||
print(" - Enhanced quality with better color matching")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Start live face swapping (camera index 0)
|
|
||||||
live_face_swapper.start_live_swap(0, display_frame)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("\n👋 Stopping...")
|
|
||||||
finally:
|
|
||||||
live_face_swapper.stop_live_swap()
|
|
||||||
cv2.destroyAllWindows()
|
|
||||||
|
|
||||||
def show_improvements_info():
|
|
||||||
"""Show information about the improvements"""
|
|
||||||
print("🚀 KIRO Improvements for Deep-Live-Cam")
|
|
||||||
print("=" * 50)
|
|
||||||
print()
|
|
||||||
print("✨ NEW FEATURES:")
|
|
||||||
print(" 1. 🎯 Face Tracking & Stabilization")
|
|
||||||
print(" - Reduces jittery face swapping")
|
|
||||||
print(" - Maintains face position during brief occlusions")
|
|
||||||
print(" - Kalman filter for smooth tracking")
|
|
||||||
print()
|
|
||||||
print(" 2. 🖐️ Occlusion Handling")
|
|
||||||
print(" - Detects hands/objects covering the face")
|
|
||||||
print(" - Keeps face swap on face area only")
|
|
||||||
print(" - Smart blending to avoid artifacts")
|
|
||||||
print()
|
|
||||||
print(" 3. ⚡ Performance Optimization")
|
|
||||||
print(" - 30-50% FPS improvement")
|
|
||||||
print(" - Adaptive quality scaling")
|
|
||||||
print(" - Smart face detection caching")
|
|
||||||
print(" - Multi-threaded processing")
|
|
||||||
print()
|
|
||||||
print(" 4. 🎨 Enhanced Quality")
|
|
||||||
print(" - Better color matching (LAB color space)")
|
|
||||||
print(" - Advanced edge smoothing")
|
|
||||||
print(" - Improved skin tone matching")
|
|
||||||
print(" - Lighting adaptation")
|
|
||||||
print()
|
|
||||||
print(" 5. 🛠️ Easy Configuration")
|
|
||||||
print(" - Performance modes: Fast/Balanced/Quality")
|
|
||||||
print(" - Hardware auto-optimization")
|
|
||||||
print(" - Interactive setup script")
|
|
||||||
print()
|
|
||||||
|
|
||||||
def main():
|
|
||||||
show_improvements_info()
|
|
||||||
|
|
||||||
print("Choose test option:")
|
|
||||||
print("1. Test live face swapping with new features")
|
|
||||||
print("2. Run performance setup")
|
|
||||||
print("3. Show performance tips")
|
|
||||||
|
|
||||||
choice = input("\nEnter choice (1-3): ").strip()
|
|
||||||
|
|
||||||
if choice == "1":
|
|
||||||
test_live_face_swap()
|
|
||||||
elif choice == "2":
|
|
||||||
os.system("python setup_performance.py")
|
|
||||||
elif choice == "3":
|
|
||||||
tips = performance_manager.get_performance_tips()
|
|
||||||
print("\n💡 Performance Tips:")
|
|
||||||
print("-" * 30)
|
|
||||||
for tip in tips:
|
|
||||||
print(f" {tip}")
|
|
||||||
else:
|
|
||||||
print("Invalid choice")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
Loading…
Reference in New Issue