diff --git a/modules/core.py b/modules/core.py index db64f37..d51b1f1 100644 --- a/modules/core.py +++ b/modules/core.py @@ -19,7 +19,8 @@ import modules.globals import modules.metadata import modules.ui as ui from modules.processors.frame.core import get_frame_processors_modules -from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path +from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, \ + get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path if 'ROCMExecutionProvider' in modules.globals.execution_providers: del torch @@ -31,20 +32,104 @@ warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') def parse_args() -> None: signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) program = argparse.ArgumentParser() - program.add_argument('-s', '--source', help='select an source image', dest='source_path') - program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') - program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') - program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') - program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) - program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) - program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) - program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) - program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) - program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') - program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) - program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') - program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) - program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}') + program.add_argument( + '-s', + '--source', + help='select an source image', + dest='source_path' + ) + program.add_argument( + '-t', + '--target', + help='select an target image or video', + dest='target_path' + ) + program.add_argument( + '-o', + '--output', + help='select output file or directory', + dest='output_path' + ) + program.add_argument( + '--frame-processor', + help='pipeline of frame processors', + dest='frame_processor', + default=['face_swapper'], + choices=['face_swapper', 'face_enhancer'], + nargs='+' + ) + program.add_argument( + '--keep-fps', + help='keep original fps', + dest='keep_fps', + action='store_true', + default=False + ) + program.add_argument( + '--keep-audio', + help='keep original audio', + dest='keep_audio', + action='store_true', + default=True + ) + program.add_argument( + '--keep-frames', + help='keep temporary frames', + dest='keep_frames', + action='store_true', + default=False + ) + program.add_argument( + '--many-faces', + help='process every face', + dest='many_faces', + action='store_true', + default=False + ) + program.add_argument( + '--video-encoder', + help='adjust output video encoder', + dest='video_encoder', + default='libx264', + choices=['libx264', 'libx265', 'libvpx-vp9'] + ) + program.add_argument( + '--video-quality', + help='adjust output video quality', + dest='video_quality', + type=int, + default=18, + choices=range(52), + metavar='[0-51]' + ) + program.add_argument( + '--max-memory', + help='maximum amount of RAM in GB', + dest='max_memory', + type=int, + default=suggest_max_memory() + ) + program.add_argument( + '--execution-provider', + help='execution provider', + dest='execution_provider', + default=['cpu'], + choices=suggest_execution_providers(), + nargs='+' + ) + program.add_argument( + '--execution-threads', + help='number of execution threads', + dest='execution_threads', + type=int, + default=suggest_execution_threads() + ) + program.add_argument( + '-v', + '--version', + action='version', + version=f'{modules.metadata.name} {modules.metadata.version}' + ) # register deprecated args program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') @@ -69,7 +154,7 @@ def parse_args() -> None: modules.globals.execution_providers = decode_execution_providers(args.execution_provider) modules.globals.execution_threads = args.execution_threads - #for ENHANCER tumbler: + # for ENHANCER tumbler: if 'face_enhancer' in args.frame_processor: modules.globals.fp_ui['face_enhancer'] = True else: @@ -81,7 +166,10 @@ def parse_args() -> None: if args.source_path_deprecated: print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') modules.globals.source_path = args.source_path_deprecated - modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path) + modules.globals.output_path = normalize_output_path( + args.source_path_deprecated, + modules.globals.target_path, args.output_path + ) if args.cpu_cores_deprecated: print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') modules.globals.execution_threads = args.cpu_cores_deprecated @@ -104,8 +192,14 @@ def encode_execution_providers(execution_providers: List[str]) -> List[str]: def decode_execution_providers(execution_providers: List[str]) -> List[str]: - return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) - if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] + return [ + provider for provider, encoded_execution_provider in zip( + onnxruntime.get_available_providers(), + encode_execution_providers(onnxruntime.get_available_providers()), + strict=False + ) + if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers) + ] def suggest_max_memory() -> int: @@ -165,13 +259,14 @@ def update_status(message: str, scope: str = 'DLC.CORE') -> None: if not modules.globals.headless: ui.update_status(message) + def start() -> None: for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): if not frame_processor.pre_start(): return # process image to image if has_image_extension(modules.globals.target_path): - if modules.globals.nsfw == False: + if not modules.globals.nsfw: from modules.predicter import predict_image if predict_image(modules.globals.target_path): destroy() @@ -186,7 +281,7 @@ def start() -> None: update_status('Processing to image failed!') return # process image to videos - if modules.globals.nsfw == False: + if not modules.globals.nsfw: from modules.predicter import predict_video if predict_video(modules.globals.target_path): destroy() diff --git a/modules/globals.py b/modules/globals.py index c392a80..4e4127f 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -5,8 +5,8 @@ ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) WORKFLOW_DIR = os.path.join(ROOT_DIR, 'workflow') file_types = [ - ('Image', ('*.png','*.jpg','*.jpeg','*.gif','*.bmp')), - ('Video', ('*.mp4','*.mkv')) + ('Image', ('*.png', '*.jpg', '*.jpeg', '*.gif', '*.bmp')), + ('Video', ('*.mp4', '*.mkv')) ] source_path = None @@ -27,4 +27,4 @@ log_level = 'error' fp_ui: Dict[str, bool] = {} nsfw = None camera_input_combobox = None -webcam_preview_running = False \ No newline at end of file +webcam_preview_running = False diff --git a/modules/processors/frame/core.py b/modules/processors/frame/core.py index 7d76704..09b563a 100644 --- a/modules/processors/frame/core.py +++ b/modules/processors/frame/core.py @@ -6,7 +6,7 @@ from typing import Any, List, Callable from tqdm import tqdm import modules -import modules.globals +import modules.globals FRAME_PROCESSORS_MODULES: List[ModuleType] = [] FRAME_PROCESSORS_INTERFACE = [ @@ -40,22 +40,29 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType set_frame_processors_modules_from_ui(frame_processors) return FRAME_PROCESSORS_MODULES + def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: global FRAME_PROCESSORS_MODULES for frame_processor, state in modules.globals.fp_ui.items(): - if state == True and frame_processor not in frame_processors: + if state and frame_processor not in frame_processors: frame_processor_module = load_frame_processor_module(frame_processor) FRAME_PROCESSORS_MODULES.append(frame_processor_module) modules.globals.frame_processors.append(frame_processor) - if state == False: + if not state: try: frame_processor_module = load_frame_processor_module(frame_processor) FRAME_PROCESSORS_MODULES.remove(frame_processor_module) modules.globals.frame_processors.remove(frame_processor) - except: + except: # noqa: B001 pass -def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: + +def multi_process_frame( + source_path: str, + temp_frame_paths: List[str], + process_frames: Callable[[str, List[str], Any], None], + progress: Any = None +) -> None: with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: futures = [] for path in temp_frame_paths: @@ -69,5 +76,9 @@ def process_video(source_path: str, frame_paths: list[str], process_frames: Call progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' total = len(frame_paths) with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: - progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory}) + progress.set_postfix({ + 'execution_providers': modules.globals.execution_providers, + 'execution_threads': modules.globals.execution_threads, + 'max_memory': modules.globals.max_memory + }) multi_process_frame(source_path, frame_paths, process_frames, progress) diff --git a/modules/processors/frame/face_enhancer.py b/modules/processors/frame/face_enhancer.py index 608071a..3c6d088 100644 --- a/modules/processors/frame/face_enhancer.py +++ b/modules/processors/frame/face_enhancer.py @@ -18,7 +18,7 @@ NAME = 'DLC.FACE-ENHANCER' def pre_check() -> bool: - download_directory_path = resolve_relative_path('..\models') + download_directory_path = resolve_relative_path('..\\models') conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth']) return True @@ -36,11 +36,11 @@ def get_face_enhancer() -> Any: with THREAD_LOCK: if FACE_ENHANCER is None: if os.name == 'nt': - model_path = resolve_relative_path('..\models\GFPGANv1.4.pth') + model_path = resolve_relative_path('..\\models\\GFPGANv1.4.pth') # todo: set models path https://github.com/TencentARC/GFPGAN/issues/399 else: model_path = resolve_relative_path('../models/GFPGANv1.4.pth') - FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined] + FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined] return FACE_ENHANCER diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 4b4a222..bdea129 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -17,7 +17,10 @@ NAME = 'DLC.FACE-SWAPPER' def pre_check() -> bool: download_directory_path = resolve_relative_path('../models') - conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx']) + conditional_download( + download_directory_path, + ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'] + ) return True diff --git a/modules/ui.py b/modules/ui.py index 2759a9e..da6997e 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -68,29 +68,65 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C select_target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps) - keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps)) + keep_fps_checkbox = ctk.CTkSwitch( + root, + text='Keep fps', + variable=keep_fps_value, + cursor='hand2', + command=lambda: setattr(modules.globals, 'keep_fps', not modules.globals.keep_fps) + ) keep_fps_checkbox.place(relx=0.1, rely=0.6) keep_frames_value = ctk.BooleanVar(value=modules.globals.keep_frames) - keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get())) + keep_frames_switch = ctk.CTkSwitch( + root, + text='Keep frames', + variable=keep_frames_value, + cursor='hand2', + command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()) + ) keep_frames_switch.place(relx=0.1, rely=0.65) # for FRAME PROCESSOR ENHANCER tumbler: enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer']) - enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get())) + enhancer_switch = ctk.CTkSwitch( + root, + text='Face Enhancer', + variable=enhancer_value, + cursor='hand2', + command=lambda: update_tumbler('face_enhancer', enhancer_value.get()) + ) enhancer_switch.place(relx=0.1, rely=0.7) keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) - keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get())) + keep_audio_switch = ctk.CTkSwitch( + root, + text='Keep audio', + variable=keep_audio_value, + cursor='hand2', + command=lambda: setattr(modules.globals, 'keep_audio', keep_audio_value.get()) + ) keep_audio_switch.place(relx=0.6, rely=0.6) many_faces_value = ctk.BooleanVar(value=modules.globals.many_faces) - many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get())) + many_faces_switch = ctk.CTkSwitch( + root, + text='Many faces', + variable=many_faces_value, + cursor='hand2', + command=lambda: setattr(modules.globals, 'many_faces', many_faces_value.get()) + ) many_faces_switch.place(relx=0.6, rely=0.65) -# nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw) -# nsfw_switch = ctk.CTkSwitch(root, text='NSFW', variable=nsfw_value, cursor='hand2', command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get())) -# nsfw_switch.place(relx=0.6, rely=0.7) + # nsfw_value = ctk.BooleanVar(value=modules.globals.nsfw) + # nsfw_switch = ctk.CTkSwitch( + # root, + # text='NSFW', + # variable=nsfw_value, + # cursor='hand2', + # command=lambda: setattr(modules.globals, 'nsfw', nsfw_value.get()) + # ) + # nsfw_switch.place(relx=0.6, rely=0.7) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start)) start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05) @@ -146,7 +182,11 @@ def select_source_path() -> None: global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft PREVIEW.withdraw() - source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft]) + source_path = ctk.filedialog.askopenfilename( + title='select an source image', + initialdir=RECENT_DIRECTORY_SOURCE, + filetypes=[img_ft] + ) if is_image(source_path): modules.globals.source_path = source_path RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path) @@ -161,7 +201,11 @@ def select_target_path() -> None: global RECENT_DIRECTORY_TARGET, img_ft, vid_ft PREVIEW.withdraw() - target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft]) + target_path = ctk.filedialog.askopenfilename( + title='select an target image or video', + initialdir=RECENT_DIRECTORY_TARGET, + filetypes=[img_ft, vid_ft] + ) if is_image(target_path): modules.globals.target_path = target_path RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path) @@ -181,9 +225,21 @@ def select_output_path(start: Callable[[], None]) -> None: global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft if is_image(modules.globals.target_path): - output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT) + output_path = ctk.filedialog.asksaveasfilename( + title='save image output file', + filetypes=[img_ft], + defaultextension='.png', + initialfile='output.png', + initialdir=RECENT_DIRECTORY_OUTPUT + ) elif is_video(modules.globals.target_path): - output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT) + output_path = ctk.filedialog.asksaveasfilename( + title='save video output file', + filetypes=[vid_ft], + defaultextension='.mp4', + initialfile='output.mp4', + initialdir=RECENT_DIRECTORY_OUTPUT + ) else: output_path = None if output_path: @@ -235,7 +291,7 @@ def init_preview() -> None: def update_preview(frame_number: int = 0) -> None: if modules.globals.source_path and modules.globals.target_path: temp_frame = get_video_frame(modules.globals.target_path, frame_number) - if modules.globals.nsfw == False: + if not modules.globals.nsfw: from modules.predicter import predict_frame if predict_frame(temp_frame): quit() @@ -249,6 +305,7 @@ def update_preview(frame_number: int = 0) -> None: image = ctk.CTkImage(image, size=image.size) preview_label.configure(image=image) + def webcam_preview(): if modules.globals.source_path is None: # No image selected @@ -256,7 +313,7 @@ def webcam_preview(): global preview_label, PREVIEW - cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary) + cap = cv2.VideoCapture(0) # Use index for the webcam (adjust the index accordingly if necessary) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam @@ -280,7 +337,7 @@ def webcam_preview(): if source_image is None and modules.globals.source_path: source_image = get_one_face(cv2.imread(modules.globals.source_path)) - temp_frame = frame.copy() #Create a copy of the frame + temp_frame = frame.copy() # Create a copy of the frame for frame_processor in frame_processors: temp_frame = frame_processor.process_frame(source_image, temp_frame) diff --git a/modules/utilities.py b/modules/utilities.py index 782395f..bfbf67c 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -32,7 +32,18 @@ def run_ffmpeg(args: List[str]) -> bool: def detect_fps(target_path: str) -> float: - command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path] + command = [ + 'ffprobe', + '-v', + 'error', + '-select_streams', + 'v:0', + '-show_entries', + 'stream=r_frame_rate', + '-of', + 'default=noprint_wrappers=1:nokey=1', + target_path + ] output = subprocess.check_output(command).decode().strip().split('/') try: numerator, denominator = map(int, output) @@ -50,12 +61,40 @@ def extract_frames(target_path: str) -> None: def create_video(target_path: str, fps: float = 30.0) -> None: temp_output_path = get_temp_output_path(target_path) temp_directory_path = get_temp_directory_path(target_path) - run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path]) + run_ffmpeg([ + '-r', + str(fps), + '-i', + os.path.join(temp_directory_path, '%04d.png'), + '-c:v', + modules.globals.video_encoder, + '-crf', + str(modules.globals.video_quality), + '-pix_fmt', + 'yuv420p', + '-vf', + 'colorspace=bt709:iall=bt601-6-625:fast=1', + '-y', + temp_output_path + ]) def restore_audio(target_path: str, output_path: str) -> None: temp_output_path = get_temp_output_path(target_path) - done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path]) + done = run_ffmpeg([ + '-i', + temp_output_path, + '-i', + target_path, + '-c:v', + 'copy', + '-map', + '0:v:0', + '-map', + '1:a:0', + '-y', + output_path + ]) if not done: move_temp(target_path, output_path) @@ -131,10 +170,16 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None: for url in urls: download_file_path = os.path.join(download_directory_path, os.path.basename(url)) if not os.path.exists(download_file_path): - request = urllib.request.urlopen(url) # type: ignore[attr-defined] + request = urllib.request.urlopen(url) # type: ignore[attr-defined] total = int(request.headers.get('Content-Length', 0)) with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: - urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined] + urllib.request.urlretrieve( + url, + download_file_path, + reporthook=lambda count, + block_size, + total_size: progress.update(block_size) + ) # type: ignore[attr-defined] def resolve_relative_path(path: str) -> str: