104 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			104 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
| import cv2
 | |
| import numpy as np
 | |
| from typing import Optional, Tuple, Callable
 | |
| import platform
 | |
| import threading
 | |
| 
 | |
| # Only import Windows-specific library if on Windows
 | |
| if platform.system() == "Windows":
 | |
|     from pygrabber.dshow_graph import FilterGraph
 | |
| 
 | |
| 
 | |
| class VideoCapturer:
 | |
|     def __init__(self, device_index: int):
 | |
|         self.device_index = device_index
 | |
|         self.frame_callback = None
 | |
|         self._current_frame = None
 | |
|         self._frame_ready = threading.Event()
 | |
|         self.is_running = False
 | |
|         self.cap = None
 | |
| 
 | |
|         # Initialize Windows-specific components if on Windows
 | |
|         if platform.system() == "Windows":
 | |
|             self.graph = FilterGraph()
 | |
|             # Verify device exists
 | |
|             devices = self.graph.get_input_devices()
 | |
|             if self.device_index >= len(devices):
 | |
|                 raise ValueError(
 | |
|                     f"Invalid device index {device_index}. Available devices: {len(devices)}"
 | |
|                 )
 | |
| 
 | |
|     def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
 | |
|         """Initialize and start video capture"""
 | |
|         try:
 | |
|             if platform.system() == "Windows":
 | |
|                 # Windows-specific capture methods
 | |
|                 capture_methods = [
 | |
|                     (self.device_index, cv2.CAP_DSHOW),  # Try DirectShow first
 | |
|                     (self.device_index, cv2.CAP_ANY),  # Then try default backend
 | |
|                     (-1, cv2.CAP_ANY),  # Try -1 as fallback
 | |
|                     (0, cv2.CAP_ANY),  # Finally try 0 without specific backend
 | |
|                 ]
 | |
| 
 | |
|                 for dev_id, backend in capture_methods:
 | |
|                     try:
 | |
|                         self.cap = cv2.VideoCapture(dev_id, backend)
 | |
|                         if self.cap.isOpened():
 | |
|                             break
 | |
|                         self.cap.release()
 | |
|                     except Exception:
 | |
|                         continue
 | |
|             else:
 | |
|                 # Unix-like systems (Linux/Mac) capture method
 | |
|                 if platform.system() == "Darwin":  # macOS
 | |
|                     print(f"INFO: macOS detected. Attempting to use cv2.CAP_AVFOUNDATION exclusively for camera index {self.device_index}.")
 | |
|                     self.cap = cv2.VideoCapture(self.device_index, cv2.CAP_AVFOUNDATION)
 | |
|                     # The check 'if not self.cap or not self.cap.isOpened():' later in the function
 | |
|                     # will now directly reflect the success or failure of AVFoundation.
 | |
|                     if not self.cap or not self.cap.isOpened():
 | |
|                          print(f"ERROR: cv2.CAP_AVFOUNDATION failed to open camera index {self.device_index}. Capture will likely fail.")
 | |
|                     # No fallback to default cv2.VideoCapture(self.device_index) here for macOS.
 | |
|                 else:  # Other Unix-like systems (e.g., Linux)
 | |
|                     self.cap = cv2.VideoCapture(self.device_index)
 | |
| 
 | |
|             if not self.cap or not self.cap.isOpened():
 | |
|                 raise RuntimeError("Failed to open camera")
 | |
| 
 | |
|             # Configure format
 | |
|             self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
 | |
|             self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
 | |
|             self.cap.set(cv2.CAP_PROP_FPS, fps)
 | |
| 
 | |
|             self.is_running = True
 | |
|             return True
 | |
| 
 | |
|         except Exception as e:
 | |
|             print(f"Failed to start capture: {str(e)}")
 | |
|             if self.cap:
 | |
|                 self.cap.release()
 | |
|             return False
 | |
| 
 | |
|     def read(self) -> Tuple[bool, Optional[np.ndarray]]:
 | |
|         """Read a frame from the camera"""
 | |
|         if not self.is_running or self.cap is None:
 | |
|             return False, None
 | |
| 
 | |
|         ret, frame = self.cap.read()
 | |
|         if ret:
 | |
|             self._current_frame = frame
 | |
|             if self.frame_callback:
 | |
|                 self.frame_callback(frame)
 | |
|             return True, frame
 | |
|         return False, None
 | |
| 
 | |
|     def release(self) -> None:
 | |
|         """Stop capture and release resources"""
 | |
|         if self.is_running and self.cap is not None:
 | |
|             self.cap.release()
 | |
|             self.is_running = False
 | |
|             self.cap = None
 | |
| 
 | |
|     def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
 | |
|         """Set callback for frame processing"""
 | |
|         self.frame_callback = callback
 |