--single-face-in-many-faces

more digits for frame filename
This commit is contained in:
Jester69 2023-06-23 23:25:03 +03:00
parent f72216a0fb
commit 3b2da8f08d
6 changed files with 83 additions and 26 deletions

View File

@ -37,6 +37,8 @@ def parse_args() -> None:
program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--single-face-in-many-faces', help='use with --target-face', dest='single_face_in_many_faces', action='store_true', default=False)
program.add_argument('--target-face', help='select an target face image (better with the face from target image/video)', dest='target_face_path')
program.add_argument('--frame-processor', help='frame processors (choices: face_swapper, face_enhancer, ...)', dest='frame_processor', default=['face_swapper'], nargs='+') program.add_argument('--frame-processor', help='frame processors (choices: face_swapper, face_enhancer, ...)', dest='frame_processor', default=['face_swapper'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
@ -54,6 +56,8 @@ def parse_args() -> None:
roop.globals.source_path = args.source_path roop.globals.source_path = args.source_path
roop.globals.target_path = args.target_path roop.globals.target_path = args.target_path
roop.globals.output_path = normalize_output_path(roop.globals.source_path, roop.globals.target_path, args.output_path) roop.globals.output_path = normalize_output_path(roop.globals.source_path, roop.globals.target_path, args.output_path)
roop.globals.target_face_path = args.target_face_path
roop.globals.single_face_in_many_faces = args.single_face_in_many_faces
roop.globals.frame_processors = args.frame_processor roop.globals.frame_processors = args.frame_processor
roop.globals.headless = args.source_path or args.target_path or args.output_path roop.globals.headless = args.source_path or args.target_path or args.output_path
roop.globals.keep_fps = args.keep_fps roop.globals.keep_fps = args.keep_fps
@ -165,7 +169,7 @@ def start() -> None:
temp_frame_paths = get_temp_frame_paths(roop.globals.target_path) temp_frame_paths = get_temp_frame_paths(roop.globals.target_path)
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME) update_status('Progressing...', frame_processor.NAME)
frame_processor.process_video(roop.globals.source_path, temp_frame_paths) frame_processor.process_video(roop.globals.source_path, roop.globals.target_face_path, temp_frame_paths)
frame_processor.post_process() frame_processor.post_process()
release_resources() release_resources()
# handles fps # handles fps

View File

@ -1,6 +1,9 @@
from typing import List from typing import List
source_path = None source_path = None
single_face_in_many_faces = None
threshold_value = 0.2
target_face_path = None
target_path = None target_path = None
output_path = None output_path = None
frame_processors: List[str] = [] frame_processors: List[str] = []

View File

@ -42,13 +42,13 @@ def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType
return FRAME_PROCESSORS_MODULES return FRAME_PROCESSORS_MODULES
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None: def multi_process_frame(source_path: str, target_face_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None:
with ThreadPoolExecutor(max_workers=roop.globals.execution_threads) as executor: with ThreadPoolExecutor(max_workers=roop.globals.execution_threads) as executor:
futures = [] futures = []
queue = create_queue(temp_frame_paths) queue = create_queue(temp_frame_paths)
queue_per_future = len(temp_frame_paths) // roop.globals.execution_threads queue_per_future = len(temp_frame_paths) // roop.globals.execution_threads
while not queue.empty(): while not queue.empty():
future = executor.submit(process_frames, source_path, pick_queue(queue, queue_per_future), update) future = executor.submit(process_frames, source_path, target_face_path, pick_queue(queue, queue_per_future), update)
futures.append(future) futures.append(future)
for future in as_completed(futures): for future in as_completed(futures):
future.result() future.result()
@ -69,11 +69,11 @@ def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]:
return queues return queues
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: def process_video(source_path: str, target_face_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths) total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
multi_process_frame(source_path, frame_paths, process_frames, lambda: update_progress(progress)) multi_process_frame(source_path, target_face_path, frame_paths, process_frames, lambda: update_progress(progress))
def update_progress(progress: Any = None) -> None: def update_progress(progress: Any = None) -> None:

View File

@ -2,6 +2,8 @@ from typing import Any, List, Callable
import cv2 import cv2
import insightface import insightface
import threading import threading
import numpy as np
from numpy.linalg import norm
import roop.globals import roop.globals
import roop.processors.frame.core import roop.processors.frame.core
@ -54,24 +56,38 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True) return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame: def is_similar(target_face: Face, found_face: Face) -> bool:
if roop.globals.many_faces: similarity = np.dot(target_face.embedding, found_face.embedding) / (norm(target_face.embedding) * norm(found_face.embedding))
if similarity > roop.globals.threshold_value:
return True
return False
def process_frame(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
if roop.globals.single_face_in_many_faces:
many_faces = get_many_faces(temp_frame) many_faces = get_many_faces(temp_frame)
if many_faces: if many_faces:
for target_face in many_faces: for found_face in many_faces:
temp_frame = swap_face(source_face, target_face, temp_frame) if is_similar(target_face, found_face):
temp_frame = swap_face(source_face, found_face, temp_frame)
elif roop.globals.many_faces:
many_faces = get_many_faces(temp_frame)
if many_faces:
for found_face in many_faces:
temp_frame = swap_face(source_face, found_face, temp_frame)
else: else:
target_face = get_one_face(temp_frame) found_face = get_one_face(temp_frame)
if target_face: if found_face:
temp_frame = swap_face(source_face, target_face, temp_frame) temp_frame = swap_face(source_face, found_face, temp_frame)
return temp_frame return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None: def process_frames(source_path: str, target_face_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
source_face = get_one_face(cv2.imread(source_path)) source_face = get_one_face(cv2.imread(source_path))
target_face = get_one_face(cv2.imread(target_face_path))
for temp_frame_path in temp_frame_paths: for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path) temp_frame = cv2.imread(temp_frame_path)
result = process_frame(source_face, temp_frame) result = process_frame(source_face, target_face, temp_frame)
cv2.imwrite(temp_frame_path, result) cv2.imwrite(temp_frame_path, result)
if update: if update:
update() update()
@ -84,5 +100,5 @@ def process_image(source_path: str, target_path: str, output_path: str) -> None:
cv2.imwrite(output_path, result) cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None: def process_video(source_path: str, target_face_path: str, temp_frame_paths: List[str]) -> None:
roop.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) roop.processors.frame.core.process_video(source_path, target_face_path, temp_frame_paths, process_frames)

View File

@ -28,6 +28,7 @@ RECENT_DIRECTORY_OUTPUT = None
preview_label = None preview_label = None
preview_slider = None preview_slider = None
source_label = None source_label = None
target_face_label = None
target_label = None target_label = None
status_label = None status_label = None
@ -42,7 +43,7 @@ def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global source_label, target_label, status_label global source_label, target_face_label, target_label, status_label
ctk.deactivate_automatic_dpi_awareness() ctk.deactivate_automatic_dpi_awareness()
ctk.set_appearance_mode('system') ctk.set_appearance_mode('system')
@ -55,16 +56,22 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
root.protocol('WM_DELETE_WINDOW', lambda: destroy()) root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None) source_label = ctk.CTkLabel(root, text=None)
source_label.place(relx=0.1, rely=0.1, relwidth=0.3, relheight=0.25) source_label.place(relx=0.1, rely=0.1, relwidth=0.2, relheight=0.25)
target_face_label = ctk.CTkLabel(root, text=None)
target_face_label.place(relx=0.4, rely=0.1, relwidth=0.2, relheight=0.25)
target_label = ctk.CTkLabel(root, text=None) target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) target_label.place(relx=0.7, rely=0.1, relwidth=0.2, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path()) source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) source_button.place(relx=0.1, rely=0.4, relwidth=0.2, relheight=0.1)
target_face_button = ctk.CTkButton(root, text='Select a target face', cursor='hand2', command=lambda: select_target_face_path())
target_face_button.place(relx=0.4, rely=0.4, relwidth=0.2, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path()) target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) target_button.place(relx=0.7, rely=0.4, relwidth=0.2, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=roop.globals.keep_fps) keep_fps_value = ctk.BooleanVar(value=roop.globals.keep_fps)
keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps)) keep_fps_checkbox = ctk.CTkSwitch(root, text='Keep fps', variable=keep_fps_value, cursor='hand2', command=lambda: setattr(roop.globals, 'keep_fps', not roop.globals.keep_fps))
@ -76,11 +83,19 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
keep_audio_value = ctk.BooleanVar(value=roop.globals.keep_audio) keep_audio_value = ctk.BooleanVar(value=roop.globals.keep_audio)
keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get())) keep_audio_switch = ctk.CTkSwitch(root, text='Keep audio', variable=keep_audio_value, cursor='hand2', command=lambda: setattr(roop.globals, 'keep_audio', keep_audio_value.get()))
keep_audio_switch.place(relx=0.6, rely=0.6) keep_audio_switch.place(relx=0.4, rely=0.6)
many_faces_value = ctk.BooleanVar(value=roop.globals.many_faces) many_faces_value = ctk.BooleanVar(value=roop.globals.many_faces)
many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(roop.globals, 'many_faces', many_faces_value.get())) many_faces_switch = ctk.CTkSwitch(root, text='Many faces', variable=many_faces_value, cursor='hand2', command=lambda: setattr(roop.globals, 'many_faces', many_faces_value.get()))
many_faces_switch.place(relx=0.6, rely=0.65) many_faces_switch.place(relx=0.4, rely=0.65)
single_face_in_many_faces_value = ctk.BooleanVar(value=roop.globals.single_face_in_many_faces)
single_face_in_many_faces_switch = ctk.CTkSwitch(root, text='Single face in many faces', variable=single_face_in_many_faces_value, cursor='hand2', command=lambda: setattr(roop.globals, 'single_face_in_many_faces', single_face_in_many_faces_value.get()))
single_face_in_many_faces_switch.place(relx=0.6, rely=0.6)
threshold_slider = ctk.CTkSlider(root, from_=0, to=1, command=lambda threshold_value: update_threshold(threshold_value))
threshold_slider.place(relx=0.6, rely=0.65, relwidth=0.3, relheight=0.025)
threshold_slider.set(roop.globals.threshold_value)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start)) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05) start_button.place(relx=0.15, rely=0.75, relwidth=0.2, relheight=0.05)
@ -160,6 +175,21 @@ def select_target_path() -> None:
target_label.configure(image=None) target_label.configure(image=None)
def select_target_face_path() -> None:
global RECENT_DIRECTORY_SOURCE
PREVIEW.withdraw()
target_face_path = ctk.filedialog.askopenfilename(title='select an target face image', initialdir=RECENT_DIRECTORY_SOURCE)
if is_image(target_face_path):
roop.globals.target_face_path = target_face_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(roop.globals.target_face_path)
image = render_image_preview(roop.globals.target_face_path, (200, 200))
target_face_label.configure(image=image)
else:
roop.globals.target_face_path = None
target_face_label.configure(image=None)
def select_output_path(start: Callable[[], None]) -> None: def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT global RECENT_DIRECTORY_OUTPUT
@ -196,6 +226,9 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
cv2.destroyAllWindows() cv2.destroyAllWindows()
def update_threshold(threshold: int = 0) -> None:
roop.globals.threshold_value = threshold
def toggle_preview() -> None: def toggle_preview() -> None:
if PREVIEW.state() == 'normal': if PREVIEW.state() == 'normal':
PREVIEW.withdraw() PREVIEW.withdraw()
@ -223,7 +256,8 @@ def update_preview(frame_number: int = 0) -> None:
for frame_processor in get_frame_processors_modules(roop.globals.frame_processors): for frame_processor in get_frame_processors_modules(roop.globals.frame_processors):
temp_frame = frame_processor.process_frame( temp_frame = frame_processor.process_frame(
get_one_face(cv2.imread(roop.globals.source_path)), get_one_face(cv2.imread(roop.globals.source_path)),
temp_frame get_one_face(cv2.imread(roop.globals.target_face_path)),
temp_frame,
) )
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)) image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS) image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)

View File

@ -44,13 +44,13 @@ def detect_fps(target_path: str) -> float:
def extract_frames(target_path: str) -> None: def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')]) run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%08d.png')])
def create_video(target_path: str, fps: float = 30.0) -> None: def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path) temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path) temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path]) run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%08d.png'), '-c:v', roop.globals.video_encoder, '-crf', str(roop.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
def restore_audio(target_path: str, output_path: str) -> None: def restore_audio(target_path: str, output_path: str) -> None: