Merge 130f096cd5 into 87d982e6f8
				
					
				
			
						commit
						ce2bf5d989
					
				|  | @ -1,12 +1,129 @@ | |||
| import sys | ||||
| import importlib | ||||
| from typing import Set, Optional | ||||
| 
 | ||||
| # Define a whitelist of allowed modules that can be dynamically imported | ||||
| ALLOWED_MODULES: Set[str] = { | ||||
|     'modules.processors.frame.blur', | ||||
|     'modules.processors.frame.censor', | ||||
|     'modules.processors.frame.crop', | ||||
|     'modules.processors.frame.resize', | ||||
|     'modules.processors.frame.rotate', | ||||
|     # Add all legitimate processor modules that should be importable | ||||
| } | ||||
| 
 | ||||
| def safe_import_module(module_name: str) -> Optional[object]: | ||||
|     """ | ||||
|     Safely import a module by checking against a whitelist. | ||||
|      | ||||
|     Args: | ||||
|         module_name: The name of the module to import | ||||
|          | ||||
|     Returns: | ||||
|         The imported module or None if the module is not in the whitelist | ||||
|     """ | ||||
|     if module_name in ALLOWED_MODULES: | ||||
|         return safe_import_module(module_name) | ||||
|     else: | ||||
|         # Log the attempt to import a non-whitelisted module | ||||
|         print(f"Warning: Attempted to import non-whitelisted module: {module_name}") | ||||
|         return None | ||||
| 
 | ||||
| import importlib | ||||
| from typing import Set, Optional | ||||
| 
 | ||||
| # Define a whitelist of allowed modules that can be dynamically imported | ||||
| ALLOWED_MODULES: Set[str] = { | ||||
|     'modules.processors.frame.blur', | ||||
|     'modules.processors.frame.censor', | ||||
|     'modules.processors.frame.crop', | ||||
|     'modules.processors.frame.resize', | ||||
|     'modules.processors.frame.rotate', | ||||
|     # Add all legitimate processor modules that should be importable | ||||
| } | ||||
| 
 | ||||
| def safe_import_module(module_name: str) -> Optional[object]: | ||||
|     """ | ||||
|     Safely import a module by checking against a whitelist. | ||||
|      | ||||
|     Args: | ||||
|         module_name: The name of the module to import | ||||
|          | ||||
|     Returns: | ||||
|         The imported module or None if the module is not in the whitelist | ||||
|     """ | ||||
|     if module_name in ALLOWED_MODULES: | ||||
|         return safe_import_module(module_name) | ||||
|     else: | ||||
|         # Log the attempt to import a non-whitelisted module | ||||
|         print(f"Warning: Attempted to import non-whitelisted module: {module_name}") | ||||
|         return None | ||||
| 
 | ||||
| from concurrent.futures import ThreadPoolExecutor | ||||
| from types import ModuleType | ||||
| from typing import Any, List, Callable | ||||
| from tqdm import tqdm | ||||
| 
 | ||||
| import modules | ||||
| import modules.globals                    | ||||
| import importlib | ||||
| from typing import Set, Optional | ||||
| 
 | ||||
| # Define a whitelist of allowed modules that can be dynamically imported | ||||
| ALLOWED_MODULES: Set[str] = { | ||||
|     'modules.processors.frame.blur', | ||||
|     'modules.processors.frame.censor', | ||||
|     'modules.processors.frame.crop', | ||||
|     'modules.processors.frame.resize', | ||||
|     'modules.processors.frame.rotate', | ||||
|     # Add all legitimate processor modules that should be importable | ||||
| } | ||||
| 
 | ||||
| def safe_import_module(module_name: str) -> Optional[object]: | ||||
|     """ | ||||
|     Safely import a module by checking against a whitelist. | ||||
|      | ||||
|     Args: | ||||
|         module_name: The name of the module to import | ||||
|          | ||||
|     Returns: | ||||
|         The imported module or None if the module is not in the whitelist | ||||
|     """ | ||||
|     if module_name in ALLOWED_MODULES: | ||||
|         return safe_import_module(module_name) | ||||
|     else: | ||||
|         # Log the attempt to import a non-whitelisted module | ||||
|         print(f"Warning: Attempted to import non-whitelisted module: {module_name}") | ||||
|         return None | ||||
| 
 | ||||
| import importlib | ||||
| from typing import Set, Optional | ||||
| 
 | ||||
| # Define a whitelist of allowed modules that can be dynamically imported | ||||
| ALLOWED_MODULES: Set[str] = { | ||||
|     'modules.processors.frame.blur', | ||||
|     'modules.processors.frame.censor', | ||||
|     'modules.processors.frame.crop', | ||||
|     'modules.processors.frame.resize', | ||||
|     'modules.processors.frame.rotate', | ||||
|     # Add all legitimate processor modules that should be importable | ||||
| } | ||||
| 
 | ||||
| def safe_import_module(module_name: str) -> Optional[object]: | ||||
|     """ | ||||
|     Safely import a module by checking against a whitelist. | ||||
|      | ||||
|     Args: | ||||
|         module_name: The name of the module to import | ||||
|          | ||||
|     Returns: | ||||
|         The imported module or None if the module is not in the whitelist | ||||
|     """ | ||||
|     if module_name in ALLOWED_MODULES: | ||||
|         return safe_import_module(module_name) | ||||
|     else: | ||||
|         # Log the attempt to import a non-whitelisted module | ||||
|         print(f"Warning: Attempted to import non-whitelisted module: {module_name}") | ||||
|         return None | ||||
| 
 | ||||
| 
 | ||||
| FRAME_PROCESSORS_MODULES: List[ModuleType] = [] | ||||
| FRAME_PROCESSORS_INTERFACE = [ | ||||
|  | @ -20,7 +137,7 @@ FRAME_PROCESSORS_INTERFACE = [ | |||
| 
 | ||||
| def load_frame_processor_module(frame_processor: str) -> Any: | ||||
|     try: | ||||
|         frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') | ||||
|         frame_processor_module = safe_import_module(f'modules.processors.frame.{frame_processor}') | ||||
|         for method_name in FRAME_PROCESSORS_INTERFACE: | ||||
|             if not hasattr(frame_processor_module, method_name): | ||||
|                 sys.exit() | ||||
|  |  | |||
|  | @ -0,0 +1,84 @@ | |||
| import sys | ||||
| import importlib | ||||
| from concurrent.futures import ThreadPoolExecutor | ||||
| from types import ModuleType | ||||
| from typing import Any, List, Callable | ||||
| from tqdm import tqdm | ||||
| 
 | ||||
| import modules | ||||
| import modules.globals                    | ||||
| 
 | ||||
| FRAME_PROCESSORS_MODULES: List[ModuleType] = [] | ||||
| FRAME_PROCESSORS_INTERFACE = [ | ||||
|     'pre_check', | ||||
|     'pre_start', | ||||
|     'process_frame', | ||||
|     'process_image', | ||||
|     'process_video' | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def load_frame_processor_module(frame_processor: str) -> Any: | ||||
|     try: | ||||
|         frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') | ||||
|         for method_name in FRAME_PROCESSORS_INTERFACE: | ||||
|             if not hasattr(frame_processor_module, method_name): | ||||
|                 sys.exit() | ||||
|     except ImportError: | ||||
|         print(f"Frame processor {frame_processor} not found") | ||||
|         sys.exit() | ||||
|     return frame_processor_module | ||||
| 
 | ||||
| 
 | ||||
| def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: | ||||
|     global FRAME_PROCESSORS_MODULES | ||||
| 
 | ||||
|     if not FRAME_PROCESSORS_MODULES: | ||||
|         for frame_processor in frame_processors: | ||||
|             frame_processor_module = load_frame_processor_module(frame_processor) | ||||
|             FRAME_PROCESSORS_MODULES.append(frame_processor_module) | ||||
|     set_frame_processors_modules_from_ui(frame_processors) | ||||
|     return FRAME_PROCESSORS_MODULES | ||||
| 
 | ||||
| def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: | ||||
|     global FRAME_PROCESSORS_MODULES | ||||
|     current_processor_names = [proc.__name__.split('.')[-1] for proc in FRAME_PROCESSORS_MODULES] | ||||
| 
 | ||||
|     for frame_processor, state in modules.globals.fp_ui.items(): | ||||
|         if state == True and frame_processor not in current_processor_names: | ||||
|             try: | ||||
|                 frame_processor_module = load_frame_processor_module(frame_processor) | ||||
|                 FRAME_PROCESSORS_MODULES.append(frame_processor_module) | ||||
|                 if frame_processor not in modules.globals.frame_processors: | ||||
|                      modules.globals.frame_processors.append(frame_processor) | ||||
|             except SystemExit: | ||||
|                  print(f"Warning: Failed to load frame processor {frame_processor} requested by UI state.") | ||||
|             except Exception as e: | ||||
|                  print(f"Warning: Error loading frame processor {frame_processor} requested by UI state: {e}") | ||||
| 
 | ||||
|         elif state == False and frame_processor in current_processor_names: | ||||
|             try: | ||||
|                 module_to_remove = next((mod for mod in FRAME_PROCESSORS_MODULES if mod.__name__.endswith(f'.{frame_processor}')), None) | ||||
|                 if module_to_remove: | ||||
|                     FRAME_PROCESSORS_MODULES.remove(module_to_remove) | ||||
|                 if frame_processor in modules.globals.frame_processors: | ||||
|                     modules.globals.frame_processors.remove(frame_processor) | ||||
|             except Exception as e: | ||||
|                  print(f"Warning: Error removing frame processor {frame_processor}: {e}") | ||||
| 
 | ||||
| def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: | ||||
|     with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: | ||||
|         futures = [] | ||||
|         for path in temp_frame_paths: | ||||
|             future = executor.submit(process_frames, source_path, [path], progress) | ||||
|             futures.append(future) | ||||
|         for future in futures: | ||||
|             future.result() | ||||
| 
 | ||||
| 
 | ||||
| def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: | ||||
|     progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' | ||||
|     total = len(frame_paths) | ||||
|     with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: | ||||
|         progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory}) | ||||
|         multi_process_frame(source_path, frame_paths, process_frames, progress) | ||||
		Loading…
	
		Reference in New Issue