import json |
import gradio as gr |
import urllib.request |
import urllib.parse |
import urllib.error |
import os |
import io |
import re |
import time |
import errno |
import requests |
import hashlib |
from pathlib import Path |
from urllib.parse import urlparse |
from sympy import preview |
from modules.shared import cmd_opts, opts |
from scripts.civitai_global import print |
import scripts.civitai_global as gl |
import scripts.civitai_api as _api |
import scripts.civitai_file_manage as _file |
import scripts.civitai_download as _download |
try: |
from send2trash import send2trash |
except ImportError: |
print("Python module 'send2trash' has not been imported correctly, please try to restart or install it manually.") |
try: |
from bs4 import BeautifulSoup |
except ImportError: |
print("Python module 'BeautifulSoup' has not been imported correctly, please try to restart or install it manually.") |
gl.init() |
offlineHTML = '<div style="font-size: 24px; text-align: center; margin: 50px !important;">The Civit-API has timed out, please try again.<br>The servers might be too busy or the selected model could not be found.</div>' |
css_path = Path(__file__).resolve().parents[1] / "style_html.css" |
no_update = False |
from_ver = False |
from_tag = False |
from_installed = False |
try: |
queue = not cmd_opts.no_gradio_queue |
except AttributeError: |
queue = not cmd_opts.disable_queue |
except: |
queue = True |
def delete_model(delete_finish=None, model_filename=None, model_string=None, list_versions=None, sha256=None, selected_list=None, model_ver=None, model_json=None): |
deleted = False |
model_id = None |
if model_string: |
_, model_id = _api.extract_model_info(model_string) |
if not model_ver: |
model_versions = _api.update_model_versions(model_id) |
else: model_versions = model_ver |
(model_name, ver_value, ver_choices) = _file.card_update(model_versions, model_string, list_versions, False) |
if not model_json: |
if model_id != None: |
selected_content_type = None |
for item in gl.json_data['items']: |
if int(item['id']) == int(model_id): |
selected_content_type = item['type'] |
desc = item['description'] |
break |
if selected_content_type is None: |
print("Model ID not found in json_data. (delete_model)") |
return |
else: |
for item in model_json["items"]: |
selected_content_type = item['type'] |
desc = item['description'] |
model_folder = os.path.join(_api.contenttype_folder(selected_content_type, desc)) |
if sha256: |
sha256_upper = sha256.upper() |
for root, _, files in os.walk(model_folder, followlinks=True): |
for file in files: |
if file.endswith('.json'): |
file_path = os.path.join(root, file) |
try: |
with open(file_path, 'r', encoding="utf-8") as json_file: |
data = json.load(json_file) |
file_sha256 = data.get('sha256', '') |
if file_sha256: |
file_sha256 = file_sha256.upper() |
except Exception as e: |
print(f"Failed to open: {file_path}: {e}") |
file_sha256 = "0" |
if file_sha256 == sha256_upper: |
unpack_list = data.get('unpackList', []) |
for unpacked_file in unpack_list: |
unpacked_file_path = os.path.join(root, unpacked_file) |
if os.path.isfile(unpacked_file_path): |
try: |
send2trash(unpacked_file_path) |
print(f"File moved to trash based on unpackList: {unpacked_file_path}") |
except: |
os.remove(unpacked_file_path) |
print(f"File deleted based on unpackList: {unpacked_file_path}") |
base_name, _ = os.path.splitext(file) |
if os.path.isfile(file_path): |
try: |
send2trash(file_path) |
print(f"Model moved to trash based on SHA-256: {file_path}") |
except: |
os.remove(file_path) |
print(f"Model deleted based on SHA-256: {file_path}") |
delete_associated_files(root, base_name) |
deleted = True |
filename_to_delete = os.path.splitext(model_filename)[0] |
aria2_file = model_filename + ".aria2" |
if not deleted: |
for root, dirs, files in os.walk(model_folder, followlinks=True): |
for file in files: |
current_file_name = os.path.splitext(file)[0] |
if filename_to_delete == current_file_name or aria2_file == file: |
path_file = os.path.join(root, file) |
if os.path.isfile(path_file): |
try: |
send2trash(path_file) |
print(f"Model moved to trash based on filename: {path_file}") |
except: |
os.remove(path_file) |
print(f"Model deleted based on filename: {path_file}") |
delete_associated_files(root, current_file_name) |
number = _download.random_number(delete_finish) |
btnDwn = not selected_list or selected_list == "[]" |
return ( |
gr.Button.update(interactive=btnDwn, visible=btnDwn), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=False, visible=False), |
gr.Textbox.update(value=number), |
gr.Textbox.update(value=model_name), |
gr.Dropdown.update(value=ver_value, choices=ver_choices) |
) |
def delete_associated_files(directory, base_name): |
for file in os.listdir(directory): |
current_base_name, ext = os.path.splitext(file) |
if current_base_name == base_name or current_base_name == f"{base_name}.preview" or current_base_name == f"{base_name}.api_info": |
path_to_delete = os.path.join(directory, file) |
try: |
send2trash(path_to_delete) |
print(f"Associated file moved to trash: {path_to_delete}") |
except: |
os.remove(path_to_delete) |
print(f"Associated file deleted: {path_to_delete}") |
def save_preview(file_path, api_response, overwrite_toggle=False, sha256=None): |
json_file = os.path.splitext(file_path)[0] + ".json" |
install_path, file_name = os.path.split(file_path) |
name = os.path.splitext(file_name)[0] |
filename = f'{name}.preview.png' |
image_path = os.path.join(install_path, filename) |
if not overwrite_toggle: |
if os.path.exists(image_path): |
return |
if not sha256: |
if os.path.exists(json_file): |
try: |
with open(json_file, 'r', encoding="utf-8") as f: |
data = json.load(f) |
if 'sha256' in data and data['sha256']: |
sha256 = data['sha256'].upper() |
except Exception as e: |
print(f"Failed to open {json_file}: {e}") |
else: |
sha256 = sha256.upper() |
for item in api_response["items"]: |
for version in item["modelVersions"]: |
for file_entry in version["files"]: |
if file_entry["hashes"].get("SHA256") == sha256: |
for image in version["images"]: |
if image["type"] == "image": |
url_with_width = re.sub(r'/width=\d+', f'/width={image["width"]}', image["url"]) |
response = requests.get(url_with_width) |
if response.status_code == 200: |
with open(image_path, 'wb') as img_file: |
img_file.write(response.content) |
print(f"Preview saved at \"{image_path}\"") |
else: |
print(f"Failed to save preview. Status code: {response.status_code}") |
return |
print(f"No preview images found for \"{name}\"") |
return |
def get_image_path(install_path, api_response, sub_folder): |
image_location = getattr(opts, "image_location", r"") |
sub_image_location = getattr(opts, "sub_image_location", True) |
image_path = install_path |
if api_response: |
json_info = api_response['items'][0] |
else: |
json_info = gl.json_info |
if image_location: |
if sub_image_location: |
desc = json_info['description'] |
content_type = json_info['type'] |
image_path = os.path.join(_api.contenttype_folder(content_type, desc, custom_folder=image_location)) |
if sub_folder and sub_folder != "None" and sub_folder != "Only available if the selected files are of the same model type": |
image_path = os.path.join(image_path, sub_folder.lstrip("/").lstrip("\\")) |
else: |
image_path = Path(image_location) |
make_dir(image_path) |
return image_path |
def save_images(preview_html, model_filename, install_path, sub_folder, api_response=None): |
image_path = get_image_path(install_path, api_response, sub_folder) |
img_urls = re.findall(r'data-sampleimg="true" src=[\'"]?([^\'" >]+)', preview_html) |
name = os.path.splitext(model_filename)[0] |
opener = urllib.request.build_opener() |
opener.addheaders = [('User-agent', 'Mozilla/5.0')] |
urllib.request.install_opener(opener) |
for i, img_url in enumerate(img_urls): |
filename = f'{name}_{i}.png' |
img_url = urllib.parse.quote(img_url, safe=':/=') |
try: |
with urllib.request.urlopen(img_url) as url: |
with open(os.path.join(image_path, filename), 'wb') as f: |
f.write(url.read()) |
print(f"Downloaded {filename}") |
except urllib.error.URLError as e: |
print(f'Error: {e.reason}') |
def card_update(gr_components, model_name, list_versions, is_install): |
if gr_components: |
version_choices = gr_components['choices'] |
else: |
print("Couldn't retrieve version, defaulting to installed") |
model_name += ".New" |
return model_name, None, None |
if is_install and not gl.download_fail and not gl.cancel_status: |
version_value_clean = list_versions + " [Installed]" |
version_choices_clean = [version if version + " [Installed]" != version_value_clean else version_value_clean for version in version_choices] |
else: |
version_value_clean = list_versions.replace(" [Installed]", "") |
version_choices_clean = [version if version.replace(" [Installed]", "") != version_value_clean else version_value_clean for version in version_choices] |
first_version_installed = "[Installed]" in version_choices_clean[0] |
any_later_version_installed = any("[Installed]" in version for version in version_choices_clean[1:]) |
if first_version_installed: |
model_name += ".New" |
elif any_later_version_installed: |
model_name += ".Old" |
else: |
model_name += ".None" |
return model_name, version_value_clean, version_choices_clean |
def list_files(folders): |
model_files = [] |
extensions = ['.pt', '.ckpt', '.pth', '.safetensors', '.th', '.zip', '.vae'] |
for folder in folders: |
if folder and os.path.exists(folder): |
for root, _, files in os.walk(folder, followlinks=True): |
for file in files: |
_, file_extension = os.path.splitext(file) |
if file_extension.lower() in extensions: |
model_files.append(os.path.join(root, file)) |
model_files = sorted(list(set(model_files))) |
return model_files |
def gen_sha256(file_path): |
json_file = os.path.splitext(file_path)[0] + ".json" |
if os.path.exists(json_file): |
try: |
with open(json_file, 'r', encoding="utf-8") as f: |
data = json.load(f) |
if 'sha256' in data and data['sha256']: |
hash_value = data['sha256'] |
return hash_value |
except Exception as e: |
print(f"Failed to open {json_file}: {e}") |
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): |
while True: |
chunk = file.read(size) |
if not chunk: |
break |
yield chunk |
blocksize = 1 << 20 |
h = hashlib.sha256() |
length = 0 |
with open(os.path.realpath(file_path), 'rb') as f: |
for block in read_chunks(f, size=blocksize): |
length += len(block) |
h.update(block) |
hash_value = h.hexdigest() |
if os.path.exists(json_file): |
try: |
with open(json_file, 'r', encoding="utf-8") as f: |
data = json.load(f) |
if 'sha256' in data and data['sha256']: |
data['sha256'] = hash_value |
with open(json_file, 'w', encoding="utf-8") as f: |
json.dump(data, f, indent=4) |
except Exception as e: |
print(f"Failed to open {json_file}: {e}") |
else: |
data = {'sha256': hash_value} |
with open(json_file, 'w', encoding="utf-8") as f: |
json.dump(data, f, indent=4) |
return hash_value |
def model_from_sent(model_name, content_type, tile_count, path_input): |
modelID_failed = False |
output_html = None |
model_file = None |
use_local_html = getattr(opts, "use_local_html", False) |
local_path_in_html = getattr(opts, "local_path_in_html", False) |
div = '<div style="color: white; font-family: var(--font); font-size: 24px; text-align: center; margin: 50px !important;">' |
not_found = div + "Model ID not found.<br>Maybe the model doesn\'t exist on CivitAI?</div>" |
path_not_found = div + "Model ID not found.<br>Could not locate the model path.</div>" |
offline = div + "CivitAI failed to respond.<br>The servers are likely offline." |
if local_path_in_html: |
use_local_html = False |
if path_input == "Not Found": |
model_name = re.sub(r'\.\d{3}$', '', model_name) |
content_type = re.sub(r'\.\d{3}$', '', content_type) |
content_mapping = { |
"txt2img_textual_inversion_cards_html": ['TextualInversion'], |
"txt2img_hypernetworks_cards_html": ['Hypernetwork'], |
"txt2img_checkpoints_cards_html": ['Checkpoint'], |
"txt2img_lora_cards_html": ['LORA', 'LoCon'] |
} |
content_type = content_mapping.get(content_type, content_type) |
extensions = ['.pt', '.ckpt', '.pth', '.safetensors', '.th', '.zip', '.vae'] |
for content_type_item in content_type: |
folder = _api.contenttype_folder(content_type_item) |
for folder_path, _, files in os.walk(folder, followlinks=True): |
for file in files: |
if file.startswith(model_name) and file.endswith(tuple(extensions)): |
model_file = os.path.join(folder_path, file) |
if not model_file: |
output_html = path_not_found |
print(f'Error: Could not find model path for model: "{model_name}"') |
print(f'Content type: "{content_type}"') |
print(f'Main folder path: "{folder}"') |
use_local_html = False |
else: |
model_file = path_input |
if use_local_html: |
html_file = os.path.splitext(model_file)[0] + ".html" |
if os.path.exists(html_file): |
with open(html_file, 'r', encoding='utf-8') as html: |
output_html = html.read() |
index = output_html.find("</head>") |
if index != -1: |
output_html = output_html[index + len("</head>"):] |
if not output_html: |
modelID = get_models(model_file, True) |
if not modelID or modelID == "Model not found": |
output_html = not_found |
modelID_failed = True |
if modelID == "offline": |
output_html = offline |
modelID_failed = True |
if not modelID_failed: |
json_data = _api.api_to_data(content_type, "Newest", "AllTime", "Model name", None, None, None, tile_count, f"civitai.com/models/{modelID}") |
else: |
json_data = None |
if json_data == "timeout": |
output_html = offline |
if json_data != None and json_data != "timeout": |
model_versions = _api.update_model_versions(modelID, json_data) |
output_html = _api.update_model_info(None, model_versions.get('value'), True, modelID, json_data) |
css_path = Path(__file__).resolve().parents[1] / "style_html.css" |
with open(css_path, 'r', encoding='utf-8') as css_file: |
css = css_file.read() |
replacements = { |
'#0b0f19': 'var(--body-background-fill)', |
'#F3F4F6': 'var(--body-text-color)', |
'white': 'var(--body-text-color)', |
'#80a6c8': 'var(--secondary-300)', |
'#60A5FA': 'var(--link-text-color-hover)', |
'#1F2937': 'var(--input-background-fill)', |
'#374151': 'var(--input-border-color)', |
'#111827': 'var(--error-background-fill)', |
'top: 50%;': '', |
'padding-top: 0px;': 'padding-top: 475px;', |
'.civitai_txt2img': '.civitai_placeholder' |
} |
for old, new in replacements.items(): |
css = css.replace(old, new) |
style_tag = f'<style>{css}</style>' |
head_section = f'<head>{style_tag}</head>' |
output_html = output_html.replace('display:flex;align-items:flex-start;', 'display:flex;align-items:flex-start;flex-wrap:wrap;justify-content:center;') |
output_html = str(head_section + output_html) |
output_html = output_html.replace('zoom-radio', 'zoom-preview-radio') |
output_html = output_html.replace('zoomRadio', 'zoomPreviewRadio') |
output_html = output_html.replace('zoom-overlay', 'zoom-preview-overlay') |
output_html = output_html.replace('resetZoom', 'resetPreviewZoom') |
number = _download.random_number() |
return ( |
gr.Textbox.update(value=output_html, placeholder=number), |
) |
def is_image_url(url): |
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] |
parsed = urlparse(url) |
path = parsed.path |
return any(path.endswith(ext) for ext in image_extensions) |
def clean_description(desc): |
try: |
soup = BeautifulSoup(desc, 'html.parser') |
for a in soup.find_all('a', href=True): |
link_text = a.text + ' ' + a['href'] |
if not is_image_url(a['href']): |
a.replace_with(link_text) |
cleaned_text = soup.get_text() |
except ImportError: |
print("Python module 'BeautifulSoup' was not imported correctly, cannot clean description. Please try to restart or install it manually.") |
cleaned_text = desc |
return cleaned_text |
def make_dir(path): |
try: |
if not os.path.exists(path): |
os.makedirs(path) |
except OSError as e: |
if e.errno == errno.EACCES: |
try: |
os.makedirs(path, mode=0o777) |
except OSError as e: |
if e.errno == errno.EACCES: |
print("Permission denied even with elevated permissions.") |
else: |
print(f"Error creating directory: {e}") |
else: |
print(f"Error creating directory: {e}") |
except Exception as e: |
print(f"Error creating directory: {e}") |
def save_model_info(install_path, file_name, sub_folder, sha256=None, preview_html=None, overwrite_toggle=False, api_response=None): |
filename = os.path.splitext(file_name)[0] |
json_file = os.path.join(install_path, f'{filename}.json') |
make_dir(install_path) |
save_api_info = getattr(opts, "save_api_info", False) |
use_local = getattr(opts, "local_path_in_html", False) |
save_to_custom = getattr(opts, "save_to_custom", False) |
if not api_response: |
api_response = gl.json_data |
image_path = get_image_path(install_path, api_response, sub_folder) |
if save_to_custom: |
save_path = image_path |
else: |
save_path = install_path |
result = find_and_save(api_response, sha256, file_name, json_file, False, overwrite_toggle) |
if result != "found": |
result = find_and_save(api_response, sha256, file_name, json_file, True, overwrite_toggle) |
if preview_html: |
if use_local: |
img_urls = re.findall(r'data-sampleimg="true" src=[\'"]?([^\'" >]+)', preview_html) |
for i, img_url in enumerate(img_urls): |
img_name = f'{filename}_{i}.png' |
preview_html = preview_html.replace(img_url,f'{os.path.join(image_path, img_name)}') |
match = re.search(r'(\s*)<div class="model-block">', preview_html) |
if match: |
indentation = match.group(1) |
else: |
indentation = '' |
css_link = f'<link rel="stylesheet" type="text/css" href="{css_path}">' |
utf8_meta_tag = f'{indentation}<meta charset="UTF-8">' |
head_section = f'{indentation}<head>{indentation} {utf8_meta_tag}{indentation} {css_link}{indentation}</head>' |
HTML = head_section + preview_html |
path_to_new_file = os.path.join(save_path, f'{filename}.html') |
with open(path_to_new_file, 'wb') as f: |
f.write(HTML.encode('utf8')) |
if save_api_info: |
path_to_new_file = os.path.join(save_path, f'{filename}.api_info.json') |
with open(path_to_new_file, mode="w", encoding="utf-8") as f: |
json.dump(gl.json_info, f, indent=4, ensure_ascii=False) |
def find_and_save(api_response, sha256=None, file_name=None, json_file=None, no_hash=None, overwrite_toggle=None): |
for item in api_response.get('items', []): |
for model_version in item.get('modelVersions', []): |
for file in model_version.get('files', []): |
file_name_api = file.get('name', '') |
sha256_api = file.get('hashes', {}).get('SHA256', '') |
if file_name == file_name_api if no_hash else sha256 == sha256_api: |
gl.json_info = item |
trained_words = model_version.get('trainedWords', []) |
model_id = model_version.get('modelId', '') |
if model_id: |
model_url = f'Model URL: \"https://civitai.com/models/{model_id}\"\n' |
description = item.get('description', '') |
if description != None: |
description = clean_description(description) |
description = model_url + description |
base_model = model_version.get('baseModel', '') |
if base_model.startswith("SD 1"): |
base_model = "SD1" |
elif base_model.startswith("SD 2"): |
base_model = "SD2" |
elif base_model.startswith("SDXL"): |
base_model = "SDXL" |
else: |
base_model = "Other" |
if isinstance(trained_words, list): |
trained_tags = ",".join(trained_words) |
trained_tags = re.sub(r'<[^>]*:[^>]*>', '', trained_tags) |
trained_tags = re.sub(r', ?', ', ', trained_tags) |
trained_tags = trained_tags.strip(', ') |
else: |
trained_tags = trained_words |
if os.path.exists(json_file): |
with open(json_file, 'r', encoding="utf-8") as f: |
try: |
content = json.load(f) |
except: |
content = {} |
else: |
content = {} |
changed = False |
if not overwrite_toggle: |
if "activation text" not in content or not content["activation text"]: |
content["activation text"] = trained_tags |
changed = True |
if "description" not in content or not content["description"]: |
content["description"] = description |
changed = True |
if "sd version" not in content or not content["sd version"]: |
content["sd version"] = base_model |
changed = True |
else: |
content["activation text"] = trained_tags |
content["description"] = description |
content["sd version"] = base_model |
changed = True |
with open(json_file, 'w', encoding="utf-8") as f: |
json.dump(content, f, indent=4) |
if changed: print(f"Model info saved to \"{json_file}\"") |
return "found" |
return "not found" |
def get_models(file_path, gen_hash=None): |
modelId = None |
sha256 = None |
json_file = os.path.splitext(file_path)[0] + ".json" |
if os.path.exists(json_file): |
try: |
with open(json_file, 'r', encoding="utf-8") as f: |
data = json.load(f) |
if 'modelId' in data: |
modelId = data['modelId'] |
if 'sha256' in data and data['sha256']: |
sha256 = data['sha256'] |
except Exception as e: |
print(f"Failed to open {json_file}: {e}") |
if not modelId or not sha256 or modelId == "Model not found": |
if gen_hash: |
if not sha256: |
sha256 = gen_sha256(file_path) |
by_hash = f"https://civitai.com/api/v1/model-versions/by-hash/{sha256}" |
else: |
if modelId: |
return modelId |
else: |
return None |
try: |
if not modelId or modelId == "Model not found": |
response = requests.get(by_hash, timeout=(10,30)) |
if response.status_code == 200: |
api_response = response.json() |
if 'error' in api_response: |
print(f"\"{file_path}\": {api_response['error']}") |
return None |
else: |
modelId = api_response.get("modelId", "") |
elif response.status_code == 503: |
return "offline" |
elif response.status_code == 404: |
api_response = response.json() |
modelId = api_response.get("error", "") |
if os.path.exists(json_file): |
try: |
with open(json_file, 'r', encoding="utf-8") as f: |
data = json.load(f) |
data['modelId'] = modelId |
data['sha256'] = sha256.upper() |
with open(json_file, 'w', encoding="utf-8") as f: |
json.dump(data, f, indent=4) |
except Exception as e: |
print(f"Failed to open {json_file}: {e}") |
else: |
data = { |
'modelId': modelId, |
'sha256': sha256.upper() |
} |
with open(json_file, 'w', encoding="utf-8") as f: |
json.dump(data, f, indent=4) |
return modelId |
except requests.exceptions.Timeout: |
print(f"Request timed out for {file_path}. Skipping...") |
return "offline" |
except requests.exceptions.ConnectionError: |
print("Failed to connect to the API. The CivitAI servers might be offline.") |
return "offline" |
except Exception as e: |
print(f"An error occurred for {file_path}: {str(e)}") |
return None |
def version_match(file_paths, api_response): |
updated_models = [] |
outdated_models = [] |
sha256_hashes = {} |
for file_path in file_paths: |
json_path = f"{os.path.splitext(file_path)[0]}.json" |
if os.path.exists(json_path): |
with open(json_path, 'r', encoding="utf-8") as f: |
try: |
json_data = json.load(f) |
sha256 = json_data.get('sha256') |
if sha256: |
sha256_hashes[os.path.basename(file_path)] = sha256.upper() |
except Exception as e: |
print(f"Failed to open {json_path}: {e}") |
file_names_and_hashes = set() |
for file_path in file_paths: |
file_name = os.path.basename(file_path) |
file_name_without_ext = os.path.splitext(file_name)[0] |
file_sha256 = sha256_hashes.get(file_name, "") |
if file_sha256: file_sha256 = file_sha256.upper() |
file_names_and_hashes.add((file_name_without_ext, file_sha256)) |
for item in api_response.get('items', []): |
model_versions = item.get('modelVersions', []) |
if not model_versions: |
continue |
for idx, model_version in enumerate(model_versions): |
files = model_version.get('files', []) |
match_found = False |
for file_entry in files: |
entry_name = os.path.splitext(file_entry.get('name', ''))[0] |
entry_sha256 = file_entry.get('hashes', {}).get('SHA256', "") |
if entry_sha256: entry_sha256 = entry_sha256.upper() |
if (entry_name, entry_sha256) in file_names_and_hashes: |
match_found = True |
break |
if match_found: |
if idx == 0: |
updated_models.append((f"&ids={item['id']}", item["name"])) |
else: |
outdated_models.append((f"&ids={item['id']}", item["name"])) |
break |
return updated_models, outdated_models |
def get_content_choices(scan_choices=False): |
use_LORA = getattr(opts, "use_LORA", False) |
if use_LORA: |
content_list = ["Checkpoint", "TextualInversion", "LORA & LoCon", "Poses", "Controlnet", "Hypernetwork", "AestheticGradient", "VAE", "Upscaler", "MotionModule", "Wildcards", "Workflows", "Other"] |
else: |
content_list = ["Checkpoint", "TextualInversion", "LORA", "LoCon", "Poses", "Controlnet", "Hypernetwork", "AestheticGradient", "VAE", "Upscaler", "MotionModule", "Wildcards", "Workflows", "Other"] |
if scan_choices: |
content_list.insert(0, 'All') |
return content_list |
return content_list |
def file_scan(folders, ver_finish, tag_finish, installed_finish, preview_finish, overwrite_toggle, tile_count, gen_hash, progress=gr.Progress() if queue else None): |
global from_ver, from_installed, no_update |
update_log = getattr(opts, "update_log", True) |
gl.scan_files = True |
no_update = False |
if from_ver: |
number = _download.random_number(ver_finish) |
elif from_tag: |
number = _download.random_number(tag_finish) |
elif from_installed: |
number = _download.random_number(installed_finish) |
elif from_preview: |
number = _download.random_number(preview_finish) |
if not folders: |
if progress != None: |
progress(0, desc=f"No folder selected.") |
no_update = True |
gl.scan_files = False |
from_ver, from_installed = False, False |
time.sleep(2) |
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number)) |
folders_to_check = [] |
if 'All' in folders: |
folders = _file.get_content_choices() |
for item in folders: |
if item == "LORA & LoCon": |
folder = _api.contenttype_folder("LORA") |
if folder: |
folders_to_check.append(folder) |
folder = _api.contenttype_folder("LoCon", fromCheck=True) |
if folder: |
folders_to_check.append(folder) |
elif item == "Upscaler": |
folder = _api.contenttype_folder(item, "SwinIR") |
if folder: |
folders_to_check.append(folder) |
folder = _api.contenttype_folder(item, "RealESRGAN") |
if folder: |
folders_to_check.append(folder) |
folder = _api.contenttype_folder(item, "GFPGAN") |
if folder: |
folders_to_check.append(folder) |
folder = _api.contenttype_folder(item, "BSRGAN") |
if folder: |
folders_to_check.append(folder) |
folder = _api.contenttype_folder(item, "ESRGAN") |
if folder: |
folders_to_check.append(folder) |
else: |
folder = _api.contenttype_folder(item) |
if folder: |
folders_to_check.append(folder) |
total_files = 0 |
files_done = 0 |
files = list_files(folders_to_check) |
total_files += len(files) |
if total_files == 0: |
if progress != None: |
progress(1, desc=f"No files in selected folder.") |
no_update = True |
gl.scan_files = False |
from_ver, from_installed = False, False |
time.sleep(2) |
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number)) |
updated_models = [] |
outdated_models = [] |
all_model_ids = [] |
file_paths = [] |
all_ids = [] |
for file_path in files: |
if gl.cancel_status: |
if progress != None: |
progress(files_done / total_files, desc=f"Processing files cancelled.") |
no_update = True |
gl.scan_files = False |
from_ver, from_installed = False, False |
time.sleep(2) |
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number)) |
file_name = os.path.basename(file_path) |
if progress != None: |
progress(files_done / total_files, desc=f"Processing file: {file_name}") |
model_id = get_models(file_path, gen_hash) |
if model_id == "offline": |
print("The CivitAI servers did not respond, unable to retrieve Model ID") |
elif model_id == "Model not found" and update_log: |
print(f"model: \"{file_name}\" not found on CivitAI servers.") |
elif model_id != None: |
all_model_ids.append(f"&ids={model_id}") |
all_ids.append(model_id) |
file_paths.append(file_path) |
elif not model_id and update_log: |
print(f"model ID not found for: \"{file_name}\"") |
files_done += 1 |
all_items = [] |
all_model_ids = list(set(all_model_ids)) |
if not all_model_ids: |
progress(1, desc=f"No model IDs could be retrieved.") |
print("Could not retrieve any Model IDs, please make sure to turn on the \"One-Time Hash Generation for externally downloaded models.\" option if you haven't already.") |
no_update = True |
gl.scan_files = False |
from_ver, from_installed = False, False |
time.sleep(2) |
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number)) |
def chunks(lst, n): |
for i in range(0, len(lst), n): |
yield lst[i:i + n] |
if not from_installed: |
model_chunks = list(chunks(all_model_ids, 500)) |
base_url = "https://civitai.com/api/v1/models?limit=100" |
url_list = [f"{base_url}{''.join(chunk)}" for chunk in model_chunks] |
url_count = len(all_model_ids) // 100 |
if len(all_model_ids) % 100 != 0: |
url_count += 1 |
url_done = 0 |
api_response = {} |
for url in url_list: |
while url: |
try: |
if progress is not None: |
progress(url_done / url_count, desc=f"Sending API request... {url_done}/{url_count}") |
response = requests.get(url, timeout=(10, 30)) |
if response.status_code == 200: |
api_response_json = response.json() |
all_items.extend(api_response_json['items']) |
metadata = api_response_json.get('metadata', {}) |
url = metadata.get('nextPage', None) |
elif response.status_code == 503: |
return ( |
gr.HTML.update(value=offlineHTML), |
gr.Textbox.update(value=number) |
) |
else: |
print(f"Error: Received status code {response.status_code} with URL: {url}") |
url = None |
url_done += 1 |
except requests.exceptions.Timeout: |
print(f"Request timed out for {url}. Skipping...") |
url = None |
except requests.exceptions.ConnectionError: |
print("Failed to connect to the API. The servers might be offline.") |
url = None |
except Exception as e: |
print(f"An unexpected error occurred: {e}") |
url = None |
api_response['items'] = all_items |
if api_response['items'] == []: |
return ( |
gr.HTML.update(value=offlineHTML), |
gr.Textbox.update(value=number) |
) |
if progress != None: |
progress(1, desc="Processing final results...") |
if from_ver: |
updated_models, outdated_models = version_match(file_paths, api_response) |
updated_set = set(updated_models) |
outdated_set = set(outdated_models) |
outdated_set = {model for model in outdated_set if model[0] not in {updated_model[0] for updated_model in updated_set}} |
all_model_ids = [model[0] for model in outdated_set] |
all_model_names = [model[1] for model in outdated_set] |
if update_log: |
for model_name in all_model_names: |
print(f'"{model_name}" is currently outdated.') |
if len(all_model_ids) == 0: |
no_update = True |
gl.scan_files = False |
from_ver = False |
return ( |
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">No updates found for selected models.</div>'), |
gr.Textbox.update(value=number) |
) |
model_chunks = list(chunks(all_model_ids, tile_count)) |
base_url = "https://civitai.com/api/v1/models?limit=100" |
gl.url_list_with_numbers = {i+1: f"{base_url}{''.join(chunk)}" for i, chunk in enumerate(model_chunks)} |
url_error = False |
api_url = gl.url_list_with_numbers.get(1) |
if not url_error: |
response = requests.get(api_url, timeout=(10,30)) |
try: |
if response.status_code == 200: |
response.encoding = "utf-8" |
gl.ver_json = json.loads(response.text) |
highest_number = max(gl.url_list_with_numbers.keys()) |
gl.ver_json["metadata"]["totalPages"] = highest_number |
if highest_number > 1: |
gl.ver_json["metadata"]["nextPage"] = gl.url_list_with_numbers.get(2) |
else: |
print(f"Error: Received status code {response.status_code} for URL: {url}") |
url_error = True |
except requests.exceptions.Timeout: |
print(f"Request timed out for {url}. Skipping...") |
url_error = True |
except requests.exceptions.ConnectionError: |
print("Failed to connect to the API. The servers might be offline.") |
url_error = True |
except Exception as e: |
print(f"An unexpected error occurred: {e}") |
url_error = True |
if url_error: |
gl.scan_files = False |
return ( |
gr.HTML.update(value=offlineHTML), |
gr.Textbox.update(value=number) |
) |
elif from_ver: |
gl.scan_files = False |
return ( |
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">Outdated models have been found.<br>Please press the button above to load the models into the browser tab</div>'), |
gr.Textbox.update(value=number) |
) |
elif from_installed: |
gl.scan_files = False |
return ( |
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">Installed models have been loaded.<br>Please press the button above to load the models into the browser tab</div>'), |
gr.Textbox.update(value=number) |
) |
elif from_tag: |
for file_path, id_value in zip(file_paths, all_ids): |
install_path, file_name = os.path.split(file_path) |
model_versions = _api.update_model_versions(id_value, api_response) |
preview_html = _api.update_model_info(None, model_versions.get('value'), True, id_value, api_response) |
sub_folder = os.path.normpath(os.path.relpath(install_path, gl.main_folder)) |
save_model_info(install_path, file_name, sub_folder, preview_html=preview_html, api_response=api_response, overwrite_toggle=overwrite_toggle) |
if progress != None: |
progress(1, desc=f"All tags succesfully saved!") |
gl.scan_files = False |
time.sleep(2) |
return ( |
gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number) |
) |
elif from_preview: |
completed_preview = 0 |
preview_count = len(file_paths) |
for file in file_paths: |
_, file_name = os.path.split(file) |
name = os.path.splitext(file_name)[0] |
if progress != None: |
progress(completed_preview / preview_count, desc=f"Saving preview images... {completed_preview}/{preview_count} | {name}") |
save_preview(file, api_response, overwrite_toggle) |
completed_preview += 1 |
gl.scan_files = False |
return ( |
gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
gr.Textbox.update(value=number) |
) |
def save_tag_start(tag_start): |
global from_tag, from_ver, from_installed, from_preview |
from_tag, from_ver, from_installed, from_preview = True, False, False, False |
number = _download.random_number(tag_start) |
return ( |
gr.Textbox.update(value=number), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
) |
def save_preview_start(preview_start): |
global from_tag, from_ver, from_installed, from_preview |
from_preview, from_tag, from_ver, from_installed = True, False, False, False |
number = _download.random_number(preview_start) |
return ( |
gr.Textbox.update(value=number), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
) |
def installed_models_start(installed_start): |
global from_installed, from_ver, from_tag, from_preview |
from_installed, from_ver, from_tag, from_preview = True, False, False, False |
number = _download.random_number(installed_start) |
return ( |
gr.Textbox.update(value=number), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
) |
def ver_search_start(ver_start): |
global from_ver, from_tag, from_installed, from_preview |
from_ver, from_tag, from_installed, from_preview = True, False, False, False |
number = _download.random_number(ver_start) |
return ( |
gr.Textbox.update(value=number), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.Button.update(interactive=False, visible=True), |
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
) |
def save_tag_finish(): |
global from_tag |
from_tag = False |
return ( |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=False) |
) |
def save_preview_finish(): |
global from_preview |
from_preview = False |
return ( |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=False) |
) |
def scan_finish(): |
return ( |
gr.Button.update(interactive=no_update, visible=no_update), |
gr.Button.update(interactive=no_update, visible=no_update), |
gr.Button.update(interactive=no_update, visible=no_update), |
gr.Button.update(interactive=no_update, visible=no_update), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=not no_update, visible=not no_update) |
) |
def load_to_browser(content_type, sort_type, period_type, use_search_term, search_term, tile_count, base_filter, nsfw): |
global from_ver, from_installed |
if from_ver: |
model_list_return = _api.update_model_list(from_ver=True, tile_count=tile_count) |
if from_installed: |
model_list_return = _api.update_model_list(from_installed=True, tile_count=tile_count) |
use_LORA = getattr(opts, "use_LORA", False) |
if content_type: |
if use_LORA and 'LORA & LoCon' in content_type: |
content_type.remove('LORA & LoCon') |
if 'LORA' not in content_type: |
content_type.append('LORA') |
if 'LoCon' not in content_type: |
content_type.append('LoCon') |
current_inputs = (content_type, sort_type, period_type, use_search_term, search_term, tile_count, base_filter, nsfw) |
gl.previous_inputs = current_inputs |
gl.file_scan = True |
from_ver, from_installed = False, False |
return ( |
*model_list_return, |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=True, visible=True), |
gr.Button.update(interactive=False, visible=False), |
gr.Button.update(interactive=False, visible=False), |
gr.HTML.update(value='<div style="min-height: 0px;"></div>') |
) |
def cancel_scan(): |
gl.cancel_status = True |
while True: |
if not gl.scan_files: |
gl.cancel_status = False |
return |
else: |
time.sleep(0.5) |
continue |