COMPLETE CLEANUP: Remove ALL Performance Files
NUCLEAR OPTION - COMPLETE REMOVAL: - Deleted modules/performance_optimizer.py - Deleted modules/performance_manager.py - Deleted modules/face_tracker.py - Deleted modules/live_face_swapper.py - Deleted test_improvements.py - Deleted setup_performance.py - Deleted performance_config.json - Removed all performance variables from globals.py BACK TO PURE ORIGINAL: - No performance optimization files at all - No custom modules that could cause overhead - Pure original Deep-Live-Cam code only - Clean modules directory EXPECTED RESULT: - Should restore original FPS performance - No hidden imports or references - No performance monitoring overhead - Back to the exact original codebase This removes ALL my additions - back to pure original Deep-Live-Cam!pull/1411/head
parent
133b2ac330
commit
12d7ca8bad
|
@ -1,220 +0,0 @@
|
|||
"""
|
||||
Advanced Face Tracking with Occlusion Handling and Stabilization
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Optional, Tuple, List, Dict, Any
|
||||
from collections import deque
|
||||
import time
|
||||
from modules.typing import Face, Frame
|
||||
|
||||
|
||||
class FaceTracker:
|
||||
def __init__(self):
|
||||
# Face tracking history
|
||||
self.face_history = deque(maxlen=10)
|
||||
self.stable_face_position = None
|
||||
self.last_valid_face = None
|
||||
self.tracking_confidence = 0.0
|
||||
|
||||
# Stabilization parameters
|
||||
self.position_smoothing = 0.7 # Higher = more stable, lower = more responsive
|
||||
self.size_smoothing = 0.8
|
||||
self.landmark_smoothing = 0.6
|
||||
|
||||
# Occlusion detection
|
||||
self.occlusion_threshold = 0.3
|
||||
self.face_template = None
|
||||
self.template_update_interval = 30 # frames
|
||||
self.frame_count = 0
|
||||
|
||||
# Kalman filter for position prediction
|
||||
self.kalman_filter = self._init_kalman_filter()
|
||||
|
||||
def _init_kalman_filter(self):
|
||||
"""Initialize Kalman filter for face position prediction"""
|
||||
kalman = cv2.KalmanFilter(4, 2)
|
||||
kalman.measurementMatrix = np.array([[1, 0, 0, 0],
|
||||
[0, 1, 0, 0]], np.float32)
|
||||
kalman.transitionMatrix = np.array([[1, 0, 1, 0],
|
||||
[0, 1, 0, 1],
|
||||
[0, 0, 1, 0],
|
||||
[0, 0, 0, 1]], np.float32)
|
||||
kalman.processNoiseCov = 0.03 * np.eye(4, dtype=np.float32)
|
||||
kalman.measurementNoiseCov = 0.1 * np.eye(2, dtype=np.float32)
|
||||
return kalman
|
||||
|
||||
def track_face(self, current_face: Optional[Face], frame: Frame) -> Optional[Face]:
|
||||
"""
|
||||
Track face with stabilization and occlusion handling
|
||||
"""
|
||||
self.frame_count += 1
|
||||
|
||||
if current_face is not None:
|
||||
# We have a detected face
|
||||
stabilized_face = self._stabilize_face(current_face)
|
||||
self._update_face_history(stabilized_face)
|
||||
self._update_face_template(frame, stabilized_face)
|
||||
self.last_valid_face = stabilized_face
|
||||
self.tracking_confidence = min(1.0, self.tracking_confidence + 0.1)
|
||||
return stabilized_face
|
||||
|
||||
else:
|
||||
# No face detected - handle occlusion
|
||||
if self.last_valid_face is not None and self.tracking_confidence > 0.3:
|
||||
# Try to predict face position using tracking
|
||||
predicted_face = self._predict_face_position(frame)
|
||||
if predicted_face is not None:
|
||||
self.tracking_confidence = max(0.0, self.tracking_confidence - 0.05)
|
||||
return predicted_face
|
||||
|
||||
# Gradually reduce confidence
|
||||
self.tracking_confidence = max(0.0, self.tracking_confidence - 0.1)
|
||||
return None
|
||||
|
||||
def _stabilize_face(self, face: Face) -> Face:
|
||||
"""Apply stabilization to reduce jitter"""
|
||||
if len(self.face_history) == 0:
|
||||
return face
|
||||
|
||||
# Get the last stable face
|
||||
last_face = self.face_history[-1]
|
||||
|
||||
# Smooth the bounding box
|
||||
face.bbox = self._smooth_bbox(face.bbox, last_face.bbox)
|
||||
|
||||
# Smooth landmarks if available
|
||||
if hasattr(face, 'landmark_2d_106') and face.landmark_2d_106 is not None:
|
||||
if hasattr(last_face, 'landmark_2d_106') and last_face.landmark_2d_106 is not None:
|
||||
face.landmark_2d_106 = self._smooth_landmarks(
|
||||
face.landmark_2d_106, last_face.landmark_2d_106
|
||||
)
|
||||
|
||||
# Update Kalman filter
|
||||
center_x = (face.bbox[0] + face.bbox[2]) / 2
|
||||
center_y = (face.bbox[1] + face.bbox[3]) / 2
|
||||
self.kalman_filter.correct(np.array([[center_x], [center_y]], dtype=np.float32))
|
||||
|
||||
return face
|
||||
|
||||
def _smooth_bbox(self, current_bbox: np.ndarray, last_bbox: np.ndarray) -> np.ndarray:
|
||||
"""Smooth bounding box coordinates"""
|
||||
alpha = 1 - self.position_smoothing
|
||||
return alpha * current_bbox + (1 - alpha) * last_bbox
|
||||
|
||||
def _smooth_landmarks(self, current_landmarks: np.ndarray, last_landmarks: np.ndarray) -> np.ndarray:
|
||||
"""Smooth facial landmarks"""
|
||||
alpha = 1 - self.landmark_smoothing
|
||||
return alpha * current_landmarks + (1 - alpha) * last_landmarks
|
||||
|
||||
def _update_face_history(self, face: Face):
|
||||
"""Update face tracking history"""
|
||||
self.face_history.append(face)
|
||||
|
||||
def _update_face_template(self, frame: Frame, face: Face):
|
||||
"""Update face template for occlusion detection"""
|
||||
if self.frame_count % self.template_update_interval == 0:
|
||||
try:
|
||||
x1, y1, x2, y2 = face.bbox.astype(int)
|
||||
x1, y1 = max(0, x1), max(0, y1)
|
||||
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
|
||||
|
||||
if x2 > x1 and y2 > y1:
|
||||
face_region = frame[y1:y2, x1:x2]
|
||||
self.face_template = cv2.resize(face_region, (64, 64))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _predict_face_position(self, frame: Frame) -> Optional[Face]:
|
||||
"""Predict face position during occlusion"""
|
||||
if self.last_valid_face is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Use Kalman filter prediction
|
||||
prediction = self.kalman_filter.predict()
|
||||
pred_x, pred_y = prediction[0, 0], prediction[1, 0]
|
||||
|
||||
# Create predicted face based on last valid face
|
||||
predicted_face = self._create_predicted_face(pred_x, pred_y)
|
||||
|
||||
# Verify prediction using template matching if available
|
||||
if self.face_template is not None:
|
||||
confidence = self._verify_prediction(frame, predicted_face)
|
||||
if confidence > self.occlusion_threshold:
|
||||
return predicted_face
|
||||
else:
|
||||
return predicted_face
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _create_predicted_face(self, center_x: float, center_y: float) -> Face:
|
||||
"""Create a predicted face object"""
|
||||
# Use the last valid face as template
|
||||
predicted_face = type(self.last_valid_face)()
|
||||
|
||||
# Copy attributes from last valid face
|
||||
for attr in dir(self.last_valid_face):
|
||||
if not attr.startswith('_'):
|
||||
try:
|
||||
setattr(predicted_face, attr, getattr(self.last_valid_face, attr))
|
||||
except:
|
||||
pass
|
||||
|
||||
# Update position
|
||||
last_center_x = (self.last_valid_face.bbox[0] + self.last_valid_face.bbox[2]) / 2
|
||||
last_center_y = (self.last_valid_face.bbox[1] + self.last_valid_face.bbox[3]) / 2
|
||||
|
||||
offset_x = center_x - last_center_x
|
||||
offset_y = center_y - last_center_y
|
||||
|
||||
# Update bbox
|
||||
predicted_face.bbox = self.last_valid_face.bbox + [offset_x, offset_y, offset_x, offset_y]
|
||||
|
||||
# Update landmarks if available
|
||||
if hasattr(predicted_face, 'landmark_2d_106') and predicted_face.landmark_2d_106 is not None:
|
||||
predicted_face.landmark_2d_106 = self.last_valid_face.landmark_2d_106 + [offset_x, offset_y]
|
||||
|
||||
return predicted_face
|
||||
|
||||
def _verify_prediction(self, frame: Frame, predicted_face: Face) -> float:
|
||||
"""Verify predicted face position using template matching"""
|
||||
try:
|
||||
x1, y1, x2, y2 = predicted_face.bbox.astype(int)
|
||||
x1, y1 = max(0, x1), max(0, y1)
|
||||
x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
|
||||
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
return 0.0
|
||||
|
||||
current_region = frame[y1:y2, x1:x2]
|
||||
current_region = cv2.resize(current_region, (64, 64))
|
||||
|
||||
# Template matching
|
||||
result = cv2.matchTemplate(current_region, self.face_template, cv2.TM_CCOEFF_NORMED)
|
||||
_, max_val, _, _ = cv2.minMaxLoc(result)
|
||||
|
||||
return max_val
|
||||
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
def is_face_stable(self) -> bool:
|
||||
"""Check if face tracking is stable"""
|
||||
return len(self.face_history) >= 5 and self.tracking_confidence > 0.7
|
||||
|
||||
def reset_tracking(self):
|
||||
"""Reset tracking state"""
|
||||
self.face_history.clear()
|
||||
self.stable_face_position = None
|
||||
self.last_valid_face = None
|
||||
self.tracking_confidence = 0.0
|
||||
self.face_template = None
|
||||
self.kalman_filter = self._init_kalman_filter()
|
||||
|
||||
|
||||
# Global face tracker instance
|
||||
face_tracker = FaceTracker()
|
|
@ -42,15 +42,4 @@ mask_feather_ratio = 8
|
|||
mask_down_size = 0.50
|
||||
mask_size = 1
|
||||
|
||||
# Enhanced performance settings
|
||||
performance_mode = "balanced" # "fast", "balanced", "quality"
|
||||
adaptive_quality = True
|
||||
target_live_fps = 30
|
||||
quality_level = 1.0
|
||||
face_detection_interval = 0.1
|
||||
enable_frame_caching = True
|
||||
enable_gpu_acceleration = True
|
||||
|
||||
# Occlusion handling settings
|
||||
enable_occlusion_detection = False # Disable by default to keep normal face swap behavior
|
||||
occlusion_sensitivity = 0.3 # Lower = less sensitive, higher = more sensitive
|
||||
# Removed all performance optimization variables
|
||||
|
|
|
@ -1,190 +0,0 @@
|
|||
"""
|
||||
Enhanced Live Face Swapper with optimized performance and quality
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional, Callable, Any
|
||||
from collections import deque
|
||||
import modules.globals
|
||||
from modules.face_analyser import get_one_face, get_many_faces
|
||||
from modules.processors.frame.face_swapper import get_face_swapper
|
||||
# Removed performance_optimizer import to maximize FPS
|
||||
from modules.video_capture import VideoCapturer
|
||||
|
||||
|
||||
class LiveFaceSwapper:
|
||||
def __init__(self):
|
||||
self.is_running = False
|
||||
self.source_face = None
|
||||
self.video_capturer = None
|
||||
self.processing_thread = None
|
||||
self.display_callback = None
|
||||
|
||||
# Performance tracking
|
||||
self.fps_counter = 0
|
||||
self.fps_start_time = time.time()
|
||||
self.current_fps = 0
|
||||
self.processed_frames = 0
|
||||
|
||||
# Frame processing
|
||||
self.input_queue = deque(maxlen=2) # Small queue to reduce latency
|
||||
self.output_queue = deque(maxlen=2)
|
||||
self.queue_lock = threading.Lock()
|
||||
|
||||
# Quality settings
|
||||
self.quality_mode = "balanced" # "fast", "balanced", "quality"
|
||||
self.adaptive_quality = True
|
||||
|
||||
def set_source_face(self, source_image_path: str) -> bool:
|
||||
"""Set the source face for swapping"""
|
||||
try:
|
||||
source_image = cv2.imread(source_image_path)
|
||||
if source_image is None:
|
||||
return False
|
||||
|
||||
face = get_one_face(source_image)
|
||||
if face is None:
|
||||
return False
|
||||
|
||||
self.source_face = face
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error setting source face: {e}")
|
||||
return False
|
||||
|
||||
def start_live_swap(self, camera_index: int, display_callback: Callable[[np.ndarray, float], None]) -> bool:
|
||||
"""Start live face swapping"""
|
||||
try:
|
||||
if self.source_face is None:
|
||||
print("No source face set")
|
||||
return False
|
||||
|
||||
self.display_callback = display_callback
|
||||
self.video_capturer = VideoCapturer(camera_index)
|
||||
|
||||
# Start video capture with optimized settings
|
||||
if not self.video_capturer.start(width=960, height=540, fps=30):
|
||||
return False
|
||||
|
||||
self.is_running = True
|
||||
self.processing_thread = threading.Thread(target=self._processing_loop, daemon=True)
|
||||
self.processing_thread.start()
|
||||
|
||||
# Start capture loop
|
||||
self._capture_loop()
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error starting live swap: {e}")
|
||||
return False
|
||||
|
||||
def stop_live_swap(self):
|
||||
"""Stop live face swapping"""
|
||||
self.is_running = False
|
||||
if self.video_capturer:
|
||||
self.video_capturer.release()
|
||||
if self.processing_thread:
|
||||
self.processing_thread.join(timeout=1.0)
|
||||
|
||||
def _capture_loop(self):
|
||||
"""Main capture loop"""
|
||||
while self.is_running:
|
||||
try:
|
||||
ret, frame = self.video_capturer.read()
|
||||
if ret and frame is not None:
|
||||
# Add frame to processing queue
|
||||
with self.queue_lock:
|
||||
if len(self.input_queue) < self.input_queue.maxlen:
|
||||
self.input_queue.append(frame.copy())
|
||||
|
||||
# Small delay to prevent excessive CPU usage
|
||||
time.sleep(0.001)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in capture loop: {e}")
|
||||
break
|
||||
|
||||
def _processing_loop(self):
|
||||
"""Background processing loop for face swapping"""
|
||||
while self.is_running:
|
||||
try:
|
||||
frame_to_process = None
|
||||
|
||||
# Get frame from input queue
|
||||
with self.queue_lock:
|
||||
if self.input_queue:
|
||||
frame_to_process = self.input_queue.popleft()
|
||||
|
||||
if frame_to_process is not None:
|
||||
# Process the frame
|
||||
processed_frame = self._process_frame(frame_to_process)
|
||||
|
||||
# Add to output queue
|
||||
with self.queue_lock:
|
||||
if len(self.output_queue) < self.output_queue.maxlen:
|
||||
self.output_queue.append(processed_frame)
|
||||
|
||||
# Update FPS and call display callback
|
||||
self._update_fps()
|
||||
if self.display_callback:
|
||||
self.display_callback(processed_frame, self.current_fps)
|
||||
|
||||
else:
|
||||
# No frame to process, small delay
|
||||
time.sleep(0.005)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in processing loop: {e}")
|
||||
time.sleep(0.01)
|
||||
|
||||
def _process_frame(self, frame: np.ndarray) -> np.ndarray:
|
||||
"""Simple frame processing - back to original approach"""
|
||||
try:
|
||||
if modules.globals.many_faces:
|
||||
many_faces = get_many_faces(frame)
|
||||
if many_faces:
|
||||
for target_face in many_faces:
|
||||
if self.source_face and target_face:
|
||||
from modules.processors.frame.face_swapper import swap_face
|
||||
frame = swap_face(self.source_face, target_face, frame)
|
||||
else:
|
||||
target_face = get_one_face(frame)
|
||||
if target_face and self.source_face:
|
||||
from modules.processors.frame.face_swapper import swap_face
|
||||
frame = swap_face(self.source_face, target_face, frame)
|
||||
|
||||
return frame
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing frame: {e}")
|
||||
return frame
|
||||
|
||||
def _update_fps(self):
|
||||
"""Update FPS counter"""
|
||||
self.fps_counter += 1
|
||||
current_time = time.time()
|
||||
|
||||
if current_time - self.fps_start_time >= 1.0:
|
||||
self.current_fps = self.fps_counter / (current_time - self.fps_start_time)
|
||||
self.fps_counter = 0
|
||||
self.fps_start_time = current_time
|
||||
|
||||
def set_quality_mode(self, mode: str):
|
||||
"""Set quality mode: 'fast', 'balanced', or 'quality'"""
|
||||
self.quality_mode = mode
|
||||
# Removed performance_optimizer references for maximum FPS
|
||||
|
||||
def get_performance_stats(self) -> dict:
|
||||
"""Get current performance statistics"""
|
||||
return {
|
||||
'fps': self.current_fps,
|
||||
'quality_level': 1.0, # Fixed value for maximum FPS
|
||||
'detection_interval': 0.1, # Fixed value for maximum FPS
|
||||
'processed_frames': self.processed_frames
|
||||
}
|
||||
|
||||
|
||||
# Global instance
|
||||
live_face_swapper = LiveFaceSwapper()
|
|
@ -1,151 +0,0 @@
|
|||
"""
|
||||
Performance Manager for Deep-Live-Cam
|
||||
Handles performance mode switching and optimization settings
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, Any
|
||||
import modules.globals
|
||||
from modules.performance_optimizer import performance_optimizer
|
||||
|
||||
|
||||
class PerformanceManager:
|
||||
def __init__(self):
|
||||
self.config_path = "performance_config.json"
|
||||
self.config = self.load_config()
|
||||
self.current_mode = "balanced"
|
||||
|
||||
def load_config(self) -> Dict[str, Any]:
|
||||
"""Load performance configuration from file"""
|
||||
try:
|
||||
if os.path.exists(self.config_path):
|
||||
with open(self.config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
return self.get_default_config()
|
||||
except Exception as e:
|
||||
print(f"Error loading performance config: {e}")
|
||||
return self.get_default_config()
|
||||
|
||||
def get_default_config(self) -> Dict[str, Any]:
|
||||
"""Get default performance configuration"""
|
||||
return {
|
||||
"performance_modes": {
|
||||
"fast": {
|
||||
"quality_level": 0.6,
|
||||
"face_detection_interval": 0.2,
|
||||
"target_fps": 30,
|
||||
"frame_skip": 2,
|
||||
"enable_caching": True,
|
||||
"processing_resolution_scale": 0.7
|
||||
},
|
||||
"balanced": {
|
||||
"quality_level": 0.85,
|
||||
"face_detection_interval": 0.1,
|
||||
"target_fps": 25,
|
||||
"frame_skip": 1,
|
||||
"enable_caching": True,
|
||||
"processing_resolution_scale": 0.85
|
||||
},
|
||||
"quality": {
|
||||
"quality_level": 1.0,
|
||||
"face_detection_interval": 0.05,
|
||||
"target_fps": 20,
|
||||
"frame_skip": 1,
|
||||
"enable_caching": False,
|
||||
"processing_resolution_scale": 1.0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def set_performance_mode(self, mode: str) -> bool:
|
||||
"""Set performance mode (fast, balanced, quality)"""
|
||||
try:
|
||||
if mode not in self.config["performance_modes"]:
|
||||
print(f"Invalid performance mode: {mode}")
|
||||
return False
|
||||
|
||||
mode_config = self.config["performance_modes"][mode]
|
||||
self.current_mode = mode
|
||||
|
||||
# Apply settings to performance optimizer
|
||||
performance_optimizer.quality_level = mode_config["quality_level"]
|
||||
performance_optimizer.detection_interval = mode_config["face_detection_interval"]
|
||||
performance_optimizer.target_fps = mode_config["target_fps"]
|
||||
|
||||
# Apply to globals
|
||||
modules.globals.performance_mode = mode
|
||||
modules.globals.quality_level = mode_config["quality_level"]
|
||||
modules.globals.face_detection_interval = mode_config["face_detection_interval"]
|
||||
modules.globals.target_live_fps = mode_config["target_fps"]
|
||||
|
||||
print(f"Performance mode set to: {mode}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error setting performance mode: {e}")
|
||||
return False
|
||||
|
||||
def get_current_mode(self) -> str:
|
||||
"""Get current performance mode"""
|
||||
return self.current_mode
|
||||
|
||||
def get_mode_info(self, mode: str) -> Dict[str, Any]:
|
||||
"""Get information about a specific performance mode"""
|
||||
return self.config["performance_modes"].get(mode, {})
|
||||
|
||||
def get_all_modes(self) -> Dict[str, Any]:
|
||||
"""Get all available performance modes"""
|
||||
return self.config["performance_modes"]
|
||||
|
||||
def optimize_for_hardware(self) -> str:
|
||||
"""Automatically select optimal performance mode based on hardware"""
|
||||
try:
|
||||
import psutil
|
||||
import torch
|
||||
|
||||
# Check available RAM
|
||||
ram_gb = psutil.virtual_memory().total / (1024**3)
|
||||
|
||||
# Check GPU availability
|
||||
has_gpu = torch.cuda.is_available()
|
||||
|
||||
# Check CPU cores
|
||||
cpu_cores = psutil.cpu_count()
|
||||
|
||||
# Determine optimal mode
|
||||
if has_gpu and ram_gb >= 8 and cpu_cores >= 8:
|
||||
optimal_mode = "quality"
|
||||
elif has_gpu and ram_gb >= 4:
|
||||
optimal_mode = "balanced"
|
||||
else:
|
||||
optimal_mode = "fast"
|
||||
|
||||
self.set_performance_mode(optimal_mode)
|
||||
print(f"Auto-optimized for hardware: {optimal_mode} mode")
|
||||
print(f" RAM: {ram_gb:.1f}GB, GPU: {has_gpu}, CPU Cores: {cpu_cores}")
|
||||
|
||||
return optimal_mode
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in hardware optimization: {e}")
|
||||
self.set_performance_mode("balanced")
|
||||
return "balanced"
|
||||
|
||||
def get_performance_tips(self) -> list:
|
||||
"""Get performance optimization tips"""
|
||||
tips = [
|
||||
"🚀 Use 'Fast' mode for maximum FPS during live streaming",
|
||||
"⚖️ Use 'Balanced' mode for good quality with decent performance",
|
||||
"🎨 Use 'Quality' mode for best results when processing videos",
|
||||
"💾 Close other applications to free up system resources",
|
||||
"🖥️ Use GPU acceleration when available (CUDA/DirectML)",
|
||||
"📹 Lower camera resolution if experiencing lag",
|
||||
"🔄 Enable frame caching for smoother playback",
|
||||
"⚡ Ensure good lighting for better face detection"
|
||||
]
|
||||
return tips
|
||||
|
||||
|
||||
# Global performance manager instance
|
||||
performance_manager = PerformanceManager()
|
|
@ -1,76 +0,0 @@
|
|||
"""
|
||||
Performance optimization module for Deep-Live-Cam
|
||||
Provides frame caching, adaptive quality, and FPS optimization
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
import time
|
||||
from typing import Dict, Any, Optional, Tuple
|
||||
import threading
|
||||
from collections import deque
|
||||
import modules.globals
|
||||
|
||||
class PerformanceOptimizer:
|
||||
def __init__(self):
|
||||
self.frame_cache = {}
|
||||
self.face_cache = {}
|
||||
self.last_detection_time = 0
|
||||
self.detection_interval = 0.1 # Detect faces every 100ms
|
||||
self.adaptive_quality = True
|
||||
self.target_fps = 30
|
||||
self.frame_times = deque(maxlen=10)
|
||||
self.current_fps = 0
|
||||
self.quality_level = 1.0
|
||||
self.min_quality = 0.5
|
||||
self.max_quality = 1.0
|
||||
|
||||
def should_detect_faces(self) -> bool:
|
||||
"""Determine if we should run face detection based on timing"""
|
||||
current_time = time.time()
|
||||
if current_time - self.last_detection_time > self.detection_interval:
|
||||
self.last_detection_time = current_time
|
||||
return True
|
||||
return False
|
||||
|
||||
def update_fps_stats(self, frame_time: float):
|
||||
"""Update FPS statistics and adjust quality accordingly"""
|
||||
self.frame_times.append(frame_time)
|
||||
if len(self.frame_times) >= 5:
|
||||
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
|
||||
self.current_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
|
||||
|
||||
if self.adaptive_quality:
|
||||
self._adjust_quality()
|
||||
|
||||
def _adjust_quality(self):
|
||||
"""Dynamically adjust processing quality based on FPS"""
|
||||
if self.current_fps < self.target_fps * 0.8: # Below 80% of target
|
||||
self.quality_level = max(self.min_quality, self.quality_level - 0.1)
|
||||
self.detection_interval = min(0.2, self.detection_interval + 0.02)
|
||||
elif self.current_fps > self.target_fps * 0.95: # Above 95% of target
|
||||
self.quality_level = min(self.max_quality, self.quality_level + 0.05)
|
||||
self.detection_interval = max(0.05, self.detection_interval - 0.01)
|
||||
|
||||
def get_optimal_resolution(self, original_size: Tuple[int, int]) -> Tuple[int, int]:
|
||||
"""Get optimal processing resolution based on current quality level"""
|
||||
width, height = original_size
|
||||
scale = self.quality_level
|
||||
return (int(width * scale), int(height * scale))
|
||||
|
||||
def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
|
||||
"""Preprocess frame for optimal performance"""
|
||||
if self.quality_level < 1.0:
|
||||
height, width = frame.shape[:2]
|
||||
new_height = int(height * self.quality_level)
|
||||
new_width = int(width * self.quality_level)
|
||||
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
|
||||
return frame
|
||||
|
||||
def postprocess_frame(self, frame: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray:
|
||||
"""Postprocess frame to target resolution"""
|
||||
if frame.shape[:2][::-1] != target_size:
|
||||
frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_CUBIC)
|
||||
return frame
|
||||
|
||||
# Global optimizer instance
|
||||
performance_optimizer = PerformanceOptimizer()
|
|
@ -1,46 +0,0 @@
|
|||
{
|
||||
"performance_modes": {
|
||||
"fast": {
|
||||
"quality_level": 0.6,
|
||||
"face_detection_interval": 0.2,
|
||||
"target_fps": 30,
|
||||
"frame_skip": 2,
|
||||
"enable_caching": true,
|
||||
"processing_resolution_scale": 0.7,
|
||||
"description": "Optimized for maximum FPS with acceptable quality"
|
||||
},
|
||||
"balanced": {
|
||||
"quality_level": 0.85,
|
||||
"face_detection_interval": 0.1,
|
||||
"target_fps": 25,
|
||||
"frame_skip": 1,
|
||||
"enable_caching": true,
|
||||
"processing_resolution_scale": 0.85,
|
||||
"description": "Balance between quality and performance"
|
||||
},
|
||||
"quality": {
|
||||
"quality_level": 1.0,
|
||||
"face_detection_interval": 0.05,
|
||||
"target_fps": 20,
|
||||
"frame_skip": 1,
|
||||
"enable_caching": false,
|
||||
"processing_resolution_scale": 1.0,
|
||||
"description": "Maximum quality with slower processing"
|
||||
}
|
||||
},
|
||||
"advanced_settings": {
|
||||
"color_matching_strength": 0.7,
|
||||
"edge_smoothing_enabled": true,
|
||||
"adaptive_quality_enabled": true,
|
||||
"gpu_memory_optimization": true,
|
||||
"face_cache_size": 10,
|
||||
"frame_buffer_size": 3
|
||||
},
|
||||
"quality_enhancements": {
|
||||
"enable_color_correction": true,
|
||||
"enable_edge_smoothing": true,
|
||||
"enable_advanced_blending": true,
|
||||
"skin_tone_matching": true,
|
||||
"lighting_adaptation": true
|
||||
}
|
||||
}
|
|
@ -1,120 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Deep-Live-Cam Performance Setup Script
|
||||
Easy configuration for optimal performance based on your hardware
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from modules.performance_manager import performance_manager
|
||||
import psutil
|
||||
import platform
|
||||
|
||||
def print_header():
|
||||
print("=" * 60)
|
||||
print("🎭 Deep-Live-Cam Performance Optimizer")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
def analyze_system():
|
||||
"""Analyze system specifications"""
|
||||
print("📊 Analyzing your system...")
|
||||
print("-" * 40)
|
||||
|
||||
# System info
|
||||
print(f"OS: {platform.system()} {platform.release()}")
|
||||
print(f"CPU: {platform.processor()}")
|
||||
print(f"CPU Cores: {psutil.cpu_count()}")
|
||||
print(f"RAM: {psutil.virtual_memory().total / (1024**3):.1f} GB")
|
||||
|
||||
# GPU info
|
||||
try:
|
||||
import torch
|
||||
if torch.cuda.is_available():
|
||||
gpu_name = torch.cuda.get_device_name(0)
|
||||
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)
|
||||
print(f"GPU: {gpu_name} ({gpu_memory:.1f} GB)")
|
||||
else:
|
||||
print("GPU: Not available or not CUDA-compatible")
|
||||
except ImportError:
|
||||
print("GPU: PyTorch not available")
|
||||
|
||||
print()
|
||||
|
||||
def show_performance_modes():
|
||||
"""Display available performance modes"""
|
||||
print("🎯 Available Performance Modes:")
|
||||
print("-" * 40)
|
||||
|
||||
modes = performance_manager.get_all_modes()
|
||||
for mode_name, mode_config in modes.items():
|
||||
print(f"\n{mode_name.upper()}:")
|
||||
print(f" Quality Level: {mode_config['quality_level']}")
|
||||
print(f" Target FPS: {mode_config['target_fps']}")
|
||||
print(f" Detection Interval: {mode_config['face_detection_interval']}s")
|
||||
if 'description' in mode_config:
|
||||
print(f" Description: {mode_config['description']}")
|
||||
|
||||
def interactive_setup():
|
||||
"""Interactive performance setup"""
|
||||
print("🛠️ Interactive Setup:")
|
||||
print("-" * 40)
|
||||
|
||||
print("\nChoose your priority:")
|
||||
print("1. Maximum FPS (for live streaming)")
|
||||
print("2. Balanced performance and quality")
|
||||
print("3. Best quality (for video processing)")
|
||||
print("4. Auto-optimize based on hardware")
|
||||
|
||||
while True:
|
||||
try:
|
||||
choice = input("\nEnter your choice (1-4): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
performance_manager.set_performance_mode("fast")
|
||||
print("✅ Set to FAST mode - Maximum FPS")
|
||||
break
|
||||
elif choice == "2":
|
||||
performance_manager.set_performance_mode("balanced")
|
||||
print("✅ Set to BALANCED mode - Good balance")
|
||||
break
|
||||
elif choice == "3":
|
||||
performance_manager.set_performance_mode("quality")
|
||||
print("✅ Set to QUALITY mode - Best results")
|
||||
break
|
||||
elif choice == "4":
|
||||
optimal_mode = performance_manager.optimize_for_hardware()
|
||||
print(f"✅ Auto-optimized to {optimal_mode.upper()} mode")
|
||||
break
|
||||
else:
|
||||
print("❌ Invalid choice. Please enter 1, 2, 3, or 4.")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n👋 Setup cancelled.")
|
||||
return
|
||||
|
||||
def show_tips():
|
||||
"""Show performance tips"""
|
||||
print("\n💡 Performance Tips:")
|
||||
print("-" * 40)
|
||||
|
||||
tips = performance_manager.get_performance_tips()
|
||||
for tip in tips:
|
||||
print(f" {tip}")
|
||||
|
||||
def main():
|
||||
print_header()
|
||||
analyze_system()
|
||||
show_performance_modes()
|
||||
interactive_setup()
|
||||
show_tips()
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("🎉 Setup complete! You can change these settings anytime by running this script again.")
|
||||
print("💻 Start Deep-Live-Cam with: python run.py")
|
||||
print("=" * 60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1,167 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for the new KIRO improvements
|
||||
Demonstrates face tracking, occlusion handling, and stabilization
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from modules.live_face_swapper import live_face_swapper
|
||||
from modules.performance_manager import performance_manager
|
||||
from modules.face_tracker import face_tracker
|
||||
import modules.globals
|
||||
|
||||
def test_live_face_swap():
|
||||
"""Test the enhanced live face swapping with new features"""
|
||||
print("🎭 Testing Enhanced Live Face Swapping")
|
||||
print("=" * 50)
|
||||
|
||||
# Set performance mode
|
||||
print("Setting performance mode to 'balanced'...")
|
||||
performance_manager.set_performance_mode("balanced")
|
||||
|
||||
# Get source image path
|
||||
source_path = input("Enter path to source face image (or press Enter for demo): ").strip()
|
||||
if not source_path:
|
||||
print("Please provide a source image path to test face swapping.")
|
||||
return
|
||||
|
||||
if not os.path.exists(source_path):
|
||||
print(f"Source image not found: {source_path}")
|
||||
return
|
||||
|
||||
# Set source face
|
||||
print("Loading source face...")
|
||||
if not live_face_swapper.set_source_face(source_path):
|
||||
print("❌ Failed to detect face in source image")
|
||||
return
|
||||
|
||||
print("✅ Source face loaded successfully")
|
||||
|
||||
# Display callback function
|
||||
def display_frame(frame, fps):
|
||||
# Add FPS text to frame
|
||||
cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||
|
||||
# Add tracking status
|
||||
if face_tracker.is_face_stable():
|
||||
status_text = "TRACKING: STABLE"
|
||||
color = (0, 255, 0)
|
||||
else:
|
||||
status_text = "TRACKING: SEARCHING"
|
||||
color = (0, 255, 255)
|
||||
|
||||
cv2.putText(frame, status_text, (10, 70),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
||||
|
||||
# Add performance info
|
||||
stats = live_face_swapper.get_performance_stats()
|
||||
quality_text = f"Quality: {stats['quality_level']:.1f}"
|
||||
cv2.putText(frame, quality_text, (10, 110),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
|
||||
|
||||
# Show frame
|
||||
cv2.imshow("Enhanced Live Face Swap - KIRO Improvements", frame)
|
||||
|
||||
# Handle key presses
|
||||
key = cv2.waitKey(1) & 0xFF
|
||||
if key == ord('q'):
|
||||
live_face_swapper.stop_live_swap()
|
||||
elif key == ord('f'): # Fast mode
|
||||
performance_manager.set_performance_mode("fast")
|
||||
print("Switched to FAST mode")
|
||||
elif key == ord('b'): # Balanced mode
|
||||
performance_manager.set_performance_mode("balanced")
|
||||
print("Switched to BALANCED mode")
|
||||
elif key == ord('h'): # Quality mode
|
||||
performance_manager.set_performance_mode("quality")
|
||||
print("Switched to QUALITY mode")
|
||||
elif key == ord('r'): # Reset tracking
|
||||
face_tracker.reset_tracking()
|
||||
print("Reset face tracking")
|
||||
|
||||
print("\n🎥 Starting live face swap...")
|
||||
print("Controls:")
|
||||
print(" Q - Quit")
|
||||
print(" F - Fast mode")
|
||||
print(" B - Balanced mode")
|
||||
print(" H - High quality mode")
|
||||
print(" R - Reset tracking")
|
||||
print("\n✨ New Features:")
|
||||
print(" - Face tracking with occlusion handling")
|
||||
print(" - Stabilized face swapping (less jittery)")
|
||||
print(" - Adaptive performance optimization")
|
||||
print(" - Enhanced quality with better color matching")
|
||||
|
||||
try:
|
||||
# Start live face swapping (camera index 0)
|
||||
live_face_swapper.start_live_swap(0, display_frame)
|
||||
except KeyboardInterrupt:
|
||||
print("\n👋 Stopping...")
|
||||
finally:
|
||||
live_face_swapper.stop_live_swap()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
def show_improvements_info():
|
||||
"""Show information about the improvements"""
|
||||
print("🚀 KIRO Improvements for Deep-Live-Cam")
|
||||
print("=" * 50)
|
||||
print()
|
||||
print("✨ NEW FEATURES:")
|
||||
print(" 1. 🎯 Face Tracking & Stabilization")
|
||||
print(" - Reduces jittery face swapping")
|
||||
print(" - Maintains face position during brief occlusions")
|
||||
print(" - Kalman filter for smooth tracking")
|
||||
print()
|
||||
print(" 2. 🖐️ Occlusion Handling")
|
||||
print(" - Detects hands/objects covering the face")
|
||||
print(" - Keeps face swap on face area only")
|
||||
print(" - Smart blending to avoid artifacts")
|
||||
print()
|
||||
print(" 3. ⚡ Performance Optimization")
|
||||
print(" - 30-50% FPS improvement")
|
||||
print(" - Adaptive quality scaling")
|
||||
print(" - Smart face detection caching")
|
||||
print(" - Multi-threaded processing")
|
||||
print()
|
||||
print(" 4. 🎨 Enhanced Quality")
|
||||
print(" - Better color matching (LAB color space)")
|
||||
print(" - Advanced edge smoothing")
|
||||
print(" - Improved skin tone matching")
|
||||
print(" - Lighting adaptation")
|
||||
print()
|
||||
print(" 5. 🛠️ Easy Configuration")
|
||||
print(" - Performance modes: Fast/Balanced/Quality")
|
||||
print(" - Hardware auto-optimization")
|
||||
print(" - Interactive setup script")
|
||||
print()
|
||||
|
||||
def main():
|
||||
show_improvements_info()
|
||||
|
||||
print("Choose test option:")
|
||||
print("1. Test live face swapping with new features")
|
||||
print("2. Run performance setup")
|
||||
print("3. Show performance tips")
|
||||
|
||||
choice = input("\nEnter choice (1-3): ").strip()
|
||||
|
||||
if choice == "1":
|
||||
test_live_face_swap()
|
||||
elif choice == "2":
|
||||
os.system("python setup_performance.py")
|
||||
elif choice == "3":
|
||||
tips = performance_manager.get_performance_tips()
|
||||
print("\n💡 Performance Tips:")
|
||||
print("-" * 30)
|
||||
for tip in tips:
|
||||
print(f" {tip}")
|
||||
else:
|
||||
print("Invalid choice")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue