import os.path from concurrent.futures import ProcessPoolExecutor import multiprocessing import time import re re_special = re.compile(r'([\\()])') def get_deepbooru_tags(pil_image): """ This method is for running only one image at a time for simple use. Used to the img2img interrogate. """ from modules import shared # prevents circular reference try: create_deepbooru_process(shared.opts.interrogate_deepbooru_score_threshold, create_deepbooru_opts()) return get_tags_from_process(pil_image) finally: release_process() OPT_INCLUDE_RANKS = "include_ranks" def create_deepbooru_opts(): from modules import shared return { "use_spaces": shared.opts.deepbooru_use_spaces, "use_escape": shared.opts.deepbooru_escape, "alpha_sort": shared.opts.deepbooru_sort_alpha, OPT_INCLUDE_RANKS: shared.opts.interrogate_return_ranks, } def deepbooru_process(queue, deepbooru_process_return, threshold, deepbooru_opts): model, tags = get_deepbooru_tags_model() while True: # while process is running, keep monitoring queue for new image pil_image = queue.get() if pil_image == "QUIT": break else: deepbooru_process_return["value"] = get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts) def create_deepbooru_process(threshold, deepbooru_opts): """ Creates deepbooru process. A queue is created to send images into the process. This enables multiple images to be processed in a row without reloading the model or creating a new process. To return the data, a shared dictionary is created to hold the tags created. To wait for tags to be returned, a value of -1 is assigned to the dictionary and the method adding the image to the queue should wait for this value to be updated with the tags. """ from modules import shared # prevents circular reference shared.deepbooru_process_manager = multiprocessing.Manager() shared.deepbooru_process_queue = shared.deepbooru_process_manager.Queue() shared.deepbooru_process_return = shared.deepbooru_process_manager.dict() shared.deepbooru_process_return["value"] = -1 shared.deepbooru_process = multiprocessing.Process(target=deepbooru_process, args=(shared.deepbooru_process_queue, shared.deepbooru_process_return, threshold, deepbooru_opts)) shared.deepbooru_process.start() def get_tags_from_process(image): from modules import shared shared.deepbooru_process_return["value"] = -1 shared.deepbooru_process_queue.put(image) while shared.deepbooru_process_return["value"] == -1: time.sleep(0.2) caption = shared.deepbooru_process_return["value"] shared.deepbooru_process_return["value"] = -1 return caption def release_process(): """ Stops the deepbooru process to return used memory """ from modules import shared # prevents circular reference shared.deepbooru_process_queue.put("QUIT") shared.deepbooru_process.join() shared.deepbooru_process_queue = None shared.deepbooru_process = None shared.deepbooru_process_return = None shared.deepbooru_process_manager = None def get_deepbooru_tags_model(): import deepdanbooru as dd import tensorflow as tf import numpy as np this_folder = os.path.dirname(__file__) model_path = os.path.abspath(os.path.join(this_folder, '..', 'models', 'deepbooru')) if not os.path.exists(os.path.join(model_path, 'project.json')): # there is no point importing these every time import zipfile from basicsr.utils.download_util import load_file_from_url load_file_from_url( r"https://github.com/KichangKim/DeepDanbooru/releases/download/v3-20211112-sgd-e28/deepdanbooru-v3-20211112-sgd-e28.zip", model_path) with zipfile.ZipFile(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip"), "r") as zip_ref: zip_ref.extractall(model_path) os.remove(os.path.join(model_path, "deepdanbooru-v3-20211112-sgd-e28.zip")) tags = dd.project.load_tags_from_project(model_path) model = dd.project.load_model_from_project( model_path, compile_model=False ) return model, tags def get_deepbooru_tags_from_model(model, tags, pil_image, threshold, deepbooru_opts): import deepdanbooru as dd import tensorflow as tf import numpy as np alpha_sort = deepbooru_opts['alpha_sort'] use_spaces = deepbooru_opts['use_spaces'] use_escape = deepbooru_opts['use_escape'] include_ranks = deepbooru_opts['include_ranks'] width = model.input_shape[2] height = model.input_shape[1] image = np.array(pil_image) image = tf.image.resize( image, size=(height, width), method=tf.image.ResizeMethod.AREA, preserve_aspect_ratio=True, ) image = image.numpy() # EagerTensor to np.array image = dd.image.transform_and_pad_image(image, width, height) image = image / 255.0 image_shape = image.shape image = image.reshape((1, image_shape[0], image_shape[1], image_shape[2])) y = model.predict(image)[0] result_dict = {} for i, tag in enumerate(tags): result_dict[tag] = y[i] unsorted_tags_in_theshold = [] result_tags_print = [] for tag in tags: if result_dict[tag] >= threshold: if tag.startswith("rating:"): continue unsorted_tags_in_theshold.append((result_dict[tag], tag)) result_tags_print.append(f'{result_dict[tag]} {tag}') # sort tags result_tags_out = [] sort_ndx = 0 if alpha_sort: sort_ndx = 1 # sort by reverse by likelihood and normal for alpha, and format tag text as requested unsorted_tags_in_theshold.sort(key=lambda y: y[sort_ndx], reverse=(not alpha_sort)) for weight, tag in unsorted_tags_in_theshold: tag_outformat = tag if use_spaces: tag_outformat = tag_outformat.replace('_', ' ') if use_escape: tag_outformat = re.sub(re_special, r'\\\1', tag_outformat) if include_ranks: tag_outformat = f"({tag_outformat}:{weight:.3f})" result_tags_out.append(tag_outformat) print('\n'.join(sorted(result_tags_print, reverse=True))) return ', '.join(result_tags_out)