|
import json |
|
import gradio as gr |
|
import urllib.request |
|
import urllib.parse |
|
import urllib.error |
|
import os |
|
import io |
|
import re |
|
import time |
|
import errno |
|
import requests |
|
import hashlib |
|
from pathlib import Path |
|
from urllib.parse import urlparse |
|
|
|
from sympy import preview |
|
from modules.shared import cmd_opts, opts |
|
from scripts.civitai_global import print |
|
import scripts.civitai_global as gl |
|
import scripts.civitai_api as _api |
|
import scripts.civitai_file_manage as _file |
|
import scripts.civitai_download as _download |
|
|
|
try: |
|
from send2trash import send2trash |
|
except ImportError: |
|
print("Python module 'send2trash' has not been imported correctly, please try to restart or install it manually.") |
|
try: |
|
from bs4 import BeautifulSoup |
|
except ImportError: |
|
print("Python module 'BeautifulSoup' has not been imported correctly, please try to restart or install it manually.") |
|
|
|
gl.init() |
|
|
|
offlineHTML = '<div style="font-size: 24px; text-align: center; margin: 50px !important;">The Civit-API has timed out, please try again.<br>The servers might be too busy or the selected model could not be found.</div>' |
|
css_path = Path(__file__).resolve().parents[1] / "style_html.css" |
|
no_update = False |
|
from_ver = False |
|
from_tag = False |
|
from_installed = False |
|
try: |
|
queue = not cmd_opts.no_gradio_queue |
|
except AttributeError: |
|
queue = not cmd_opts.disable_queue |
|
except: |
|
queue = True |
|
|
|
def delete_model(delete_finish=None, model_filename=None, model_string=None, list_versions=None, sha256=None, selected_list=None, model_ver=None, model_json=None): |
|
deleted = False |
|
model_id = None |
|
|
|
if model_string: |
|
_, model_id = _api.extract_model_info(model_string) |
|
|
|
if not model_ver: |
|
model_versions = _api.update_model_versions(model_id) |
|
else: model_versions = model_ver |
|
|
|
(model_name, ver_value, ver_choices) = _file.card_update(model_versions, model_string, list_versions, False) |
|
if not model_json: |
|
if model_id != None: |
|
selected_content_type = None |
|
for item in gl.json_data['items']: |
|
if int(item['id']) == int(model_id): |
|
selected_content_type = item['type'] |
|
desc = item['description'] |
|
break |
|
|
|
if selected_content_type is None: |
|
print("Model ID not found in json_data. (delete_model)") |
|
return |
|
else: |
|
for item in model_json["items"]: |
|
selected_content_type = item['type'] |
|
desc = item['description'] |
|
|
|
model_folder = os.path.join(_api.contenttype_folder(selected_content_type, desc)) |
|
|
|
|
|
if sha256: |
|
sha256_upper = sha256.upper() |
|
for root, _, files in os.walk(model_folder, followlinks=True): |
|
for file in files: |
|
if file.endswith('.json'): |
|
file_path = os.path.join(root, file) |
|
try: |
|
with open(file_path, 'r', encoding="utf-8") as json_file: |
|
data = json.load(json_file) |
|
file_sha256 = data.get('sha256', '') |
|
if file_sha256: |
|
file_sha256 = file_sha256.upper() |
|
except Exception as e: |
|
print(f"Failed to open: {file_path}: {e}") |
|
file_sha256 = "0" |
|
|
|
if file_sha256 == sha256_upper: |
|
unpack_list = data.get('unpackList', []) |
|
for unpacked_file in unpack_list: |
|
unpacked_file_path = os.path.join(root, unpacked_file) |
|
if os.path.isfile(unpacked_file_path): |
|
try: |
|
send2trash(unpacked_file_path) |
|
print(f"File moved to trash based on unpackList: {unpacked_file_path}") |
|
except: |
|
os.remove(unpacked_file_path) |
|
print(f"File deleted based on unpackList: {unpacked_file_path}") |
|
|
|
base_name, _ = os.path.splitext(file) |
|
if os.path.isfile(file_path): |
|
try: |
|
send2trash(file_path) |
|
print(f"Model moved to trash based on SHA-256: {file_path}") |
|
except: |
|
os.remove(file_path) |
|
print(f"Model deleted based on SHA-256: {file_path}") |
|
delete_associated_files(root, base_name) |
|
deleted = True |
|
|
|
|
|
filename_to_delete = os.path.splitext(model_filename)[0] |
|
aria2_file = model_filename + ".aria2" |
|
if not deleted: |
|
for root, dirs, files in os.walk(model_folder, followlinks=True): |
|
for file in files: |
|
current_file_name = os.path.splitext(file)[0] |
|
if filename_to_delete == current_file_name or aria2_file == file: |
|
path_file = os.path.join(root, file) |
|
if os.path.isfile(path_file): |
|
try: |
|
send2trash(path_file) |
|
print(f"Model moved to trash based on filename: {path_file}") |
|
except: |
|
os.remove(path_file) |
|
print(f"Model deleted based on filename: {path_file}") |
|
delete_associated_files(root, current_file_name) |
|
|
|
number = _download.random_number(delete_finish) |
|
|
|
|
|
btnDwn = not selected_list or selected_list == "[]" |
|
|
|
return ( |
|
gr.Button.update(interactive=btnDwn, visible=btnDwn), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Textbox.update(value=number), |
|
gr.Textbox.update(value=model_name), |
|
gr.Dropdown.update(value=ver_value, choices=ver_choices) |
|
) |
|
|
|
def delete_associated_files(directory, base_name): |
|
for file in os.listdir(directory): |
|
current_base_name, ext = os.path.splitext(file) |
|
if current_base_name == base_name or current_base_name == f"{base_name}.preview" or current_base_name == f"{base_name}.api_info": |
|
path_to_delete = os.path.join(directory, file) |
|
try: |
|
send2trash(path_to_delete) |
|
print(f"Associated file moved to trash: {path_to_delete}") |
|
except: |
|
os.remove(path_to_delete) |
|
print(f"Associated file deleted: {path_to_delete}") |
|
|
|
def save_preview(file_path, api_response, overwrite_toggle=False, sha256=None): |
|
json_file = os.path.splitext(file_path)[0] + ".json" |
|
install_path, file_name = os.path.split(file_path) |
|
name = os.path.splitext(file_name)[0] |
|
filename = f'{name}.preview.png' |
|
image_path = os.path.join(install_path, filename) |
|
|
|
if not overwrite_toggle: |
|
if os.path.exists(image_path): |
|
return |
|
|
|
if not sha256: |
|
if os.path.exists(json_file): |
|
try: |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
if 'sha256' in data and data['sha256']: |
|
sha256 = data['sha256'].upper() |
|
except Exception as e: |
|
print(f"Failed to open {json_file}: {e}") |
|
else: |
|
sha256 = sha256.upper() |
|
|
|
for item in api_response["items"]: |
|
for version in item["modelVersions"]: |
|
for file_entry in version["files"]: |
|
if file_entry["hashes"].get("SHA256") == sha256: |
|
for image in version["images"]: |
|
if image["type"] == "image": |
|
url_with_width = re.sub(r'/width=\d+', f'/width={image["width"]}', image["url"]) |
|
|
|
response = requests.get(url_with_width) |
|
if response.status_code == 200: |
|
with open(image_path, 'wb') as img_file: |
|
img_file.write(response.content) |
|
print(f"Preview saved at \"{image_path}\"") |
|
else: |
|
print(f"Failed to save preview. Status code: {response.status_code}") |
|
|
|
return |
|
print(f"No preview images found for \"{name}\"") |
|
return |
|
|
|
def get_image_path(install_path, api_response, sub_folder): |
|
image_location = getattr(opts, "image_location", r"") |
|
sub_image_location = getattr(opts, "sub_image_location", True) |
|
image_path = install_path |
|
if api_response: |
|
json_info = api_response['items'][0] |
|
else: |
|
json_info = gl.json_info |
|
if image_location: |
|
if sub_image_location: |
|
desc = json_info['description'] |
|
content_type = json_info['type'] |
|
image_path = os.path.join(_api.contenttype_folder(content_type, desc, custom_folder=image_location)) |
|
|
|
if sub_folder and sub_folder != "None" and sub_folder != "Only available if the selected files are of the same model type": |
|
image_path = os.path.join(image_path, sub_folder.lstrip("/").lstrip("\\")) |
|
else: |
|
image_path = Path(image_location) |
|
make_dir(image_path) |
|
return image_path |
|
|
|
def save_images(preview_html, model_filename, install_path, sub_folder, api_response=None): |
|
image_path = get_image_path(install_path, api_response, sub_folder) |
|
img_urls = re.findall(r'data-sampleimg="true" src=[\'"]?([^\'" >]+)', preview_html) |
|
|
|
name = os.path.splitext(model_filename)[0] |
|
|
|
opener = urllib.request.build_opener() |
|
opener.addheaders = [('User-agent', 'Mozilla/5.0')] |
|
urllib.request.install_opener(opener) |
|
|
|
for i, img_url in enumerate(img_urls): |
|
filename = f'{name}_{i}.png' |
|
img_url = urllib.parse.quote(img_url, safe=':/=') |
|
try: |
|
with urllib.request.urlopen(img_url) as url: |
|
with open(os.path.join(image_path, filename), 'wb') as f: |
|
f.write(url.read()) |
|
print(f"Downloaded {filename}") |
|
|
|
except urllib.error.URLError as e: |
|
print(f'Error: {e.reason}') |
|
|
|
def card_update(gr_components, model_name, list_versions, is_install): |
|
if gr_components: |
|
version_choices = gr_components['choices'] |
|
else: |
|
print("Couldn't retrieve version, defaulting to installed") |
|
model_name += ".New" |
|
return model_name, None, None |
|
|
|
if is_install and not gl.download_fail and not gl.cancel_status: |
|
version_value_clean = list_versions + " [Installed]" |
|
version_choices_clean = [version if version + " [Installed]" != version_value_clean else version_value_clean for version in version_choices] |
|
|
|
else: |
|
version_value_clean = list_versions.replace(" [Installed]", "") |
|
version_choices_clean = [version if version.replace(" [Installed]", "") != version_value_clean else version_value_clean for version in version_choices] |
|
|
|
first_version_installed = "[Installed]" in version_choices_clean[0] |
|
any_later_version_installed = any("[Installed]" in version for version in version_choices_clean[1:]) |
|
|
|
if first_version_installed: |
|
model_name += ".New" |
|
elif any_later_version_installed: |
|
model_name += ".Old" |
|
else: |
|
model_name += ".None" |
|
|
|
return model_name, version_value_clean, version_choices_clean |
|
|
|
def list_files(folders): |
|
model_files = [] |
|
|
|
extensions = ['.pt', '.ckpt', '.pth', '.safetensors', '.th', '.zip', '.vae'] |
|
|
|
for folder in folders: |
|
if folder and os.path.exists(folder): |
|
for root, _, files in os.walk(folder, followlinks=True): |
|
for file in files: |
|
_, file_extension = os.path.splitext(file) |
|
if file_extension.lower() in extensions: |
|
model_files.append(os.path.join(root, file)) |
|
|
|
model_files = sorted(list(set(model_files))) |
|
return model_files |
|
|
|
def gen_sha256(file_path): |
|
json_file = os.path.splitext(file_path)[0] + ".json" |
|
|
|
if os.path.exists(json_file): |
|
try: |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
if 'sha256' in data and data['sha256']: |
|
hash_value = data['sha256'] |
|
return hash_value |
|
except Exception as e: |
|
print(f"Failed to open {json_file}: {e}") |
|
|
|
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): |
|
while True: |
|
chunk = file.read(size) |
|
if not chunk: |
|
break |
|
yield chunk |
|
|
|
blocksize = 1 << 20 |
|
h = hashlib.sha256() |
|
length = 0 |
|
with open(os.path.realpath(file_path), 'rb') as f: |
|
for block in read_chunks(f, size=blocksize): |
|
length += len(block) |
|
h.update(block) |
|
|
|
hash_value = h.hexdigest() |
|
|
|
if os.path.exists(json_file): |
|
try: |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
if 'sha256' in data and data['sha256']: |
|
data['sha256'] = hash_value |
|
|
|
with open(json_file, 'w', encoding="utf-8") as f: |
|
json.dump(data, f, indent=4) |
|
except Exception as e: |
|
print(f"Failed to open {json_file}: {e}") |
|
else: |
|
data = {'sha256': hash_value} |
|
with open(json_file, 'w', encoding="utf-8") as f: |
|
json.dump(data, f, indent=4) |
|
|
|
return hash_value |
|
|
|
def model_from_sent(model_name, content_type, tile_count, path_input): |
|
modelID_failed = False |
|
output_html = None |
|
model_file = None |
|
use_local_html = getattr(opts, "use_local_html", False) |
|
local_path_in_html = getattr(opts, "local_path_in_html", False) |
|
|
|
div = '<div style="color: white; font-family: var(--font); font-size: 24px; text-align: center; margin: 50px !important;">' |
|
not_found = div + "Model ID not found.<br>Maybe the model doesn\'t exist on CivitAI?</div>" |
|
path_not_found = div + "Model ID not found.<br>Could not locate the model path.</div>" |
|
offline = div + "CivitAI failed to respond.<br>The servers are likely offline." |
|
|
|
if local_path_in_html: |
|
use_local_html = False |
|
|
|
if path_input == "Not Found": |
|
model_name = re.sub(r'\.\d{3}$', '', model_name) |
|
content_type = re.sub(r'\.\d{3}$', '', content_type) |
|
content_mapping = { |
|
"txt2img_textual_inversion_cards_html": ['TextualInversion'], |
|
"txt2img_hypernetworks_cards_html": ['Hypernetwork'], |
|
"txt2img_checkpoints_cards_html": ['Checkpoint'], |
|
"txt2img_lora_cards_html": ['LORA', 'LoCon'] |
|
} |
|
content_type = content_mapping.get(content_type, content_type) |
|
|
|
extensions = ['.pt', '.ckpt', '.pth', '.safetensors', '.th', '.zip', '.vae'] |
|
|
|
for content_type_item in content_type: |
|
folder = _api.contenttype_folder(content_type_item) |
|
for folder_path, _, files in os.walk(folder, followlinks=True): |
|
for file in files: |
|
if file.startswith(model_name) and file.endswith(tuple(extensions)): |
|
model_file = os.path.join(folder_path, file) |
|
if not model_file: |
|
output_html = path_not_found |
|
print(f'Error: Could not find model path for model: "{model_name}"') |
|
print(f'Content type: "{content_type}"') |
|
print(f'Main folder path: "{folder}"') |
|
use_local_html = False |
|
else: |
|
model_file = path_input |
|
|
|
if use_local_html: |
|
html_file = os.path.splitext(model_file)[0] + ".html" |
|
if os.path.exists(html_file): |
|
with open(html_file, 'r', encoding='utf-8') as html: |
|
output_html = html.read() |
|
index = output_html.find("</head>") |
|
if index != -1: |
|
output_html = output_html[index + len("</head>"):] |
|
|
|
if not output_html: |
|
modelID = get_models(model_file, True) |
|
if not modelID or modelID == "Model not found": |
|
output_html = not_found |
|
modelID_failed = True |
|
if modelID == "offline": |
|
output_html = offline |
|
modelID_failed = True |
|
if not modelID_failed: |
|
json_data = _api.api_to_data(content_type, "Newest", "AllTime", "Model name", None, None, None, tile_count, f"civitai.com/models/{modelID}") |
|
else: |
|
json_data = None |
|
|
|
if json_data == "timeout": |
|
output_html = offline |
|
if json_data != None and json_data != "timeout": |
|
model_versions = _api.update_model_versions(modelID, json_data) |
|
output_html = _api.update_model_info(None, model_versions.get('value'), True, modelID, json_data) |
|
|
|
css_path = Path(__file__).resolve().parents[1] / "style_html.css" |
|
with open(css_path, 'r', encoding='utf-8') as css_file: |
|
css = css_file.read() |
|
replacements = { |
|
'#0b0f19': 'var(--body-background-fill)', |
|
'#F3F4F6': 'var(--body-text-color)', |
|
'white': 'var(--body-text-color)', |
|
'#80a6c8': 'var(--secondary-300)', |
|
'#60A5FA': 'var(--link-text-color-hover)', |
|
'#1F2937': 'var(--input-background-fill)', |
|
'#374151': 'var(--input-border-color)', |
|
'#111827': 'var(--error-background-fill)', |
|
'top: 50%;': '', |
|
'padding-top: 0px;': 'padding-top: 475px;', |
|
'.civitai_txt2img': '.civitai_placeholder' |
|
} |
|
|
|
for old, new in replacements.items(): |
|
css = css.replace(old, new) |
|
|
|
style_tag = f'<style>{css}</style>' |
|
head_section = f'<head>{style_tag}</head>' |
|
|
|
output_html = output_html.replace('display:flex;align-items:flex-start;', 'display:flex;align-items:flex-start;flex-wrap:wrap;justify-content:center;') |
|
output_html = str(head_section + output_html) |
|
output_html = output_html.replace('zoom-radio', 'zoom-preview-radio') |
|
output_html = output_html.replace('zoomRadio', 'zoomPreviewRadio') |
|
output_html = output_html.replace('zoom-overlay', 'zoom-preview-overlay') |
|
output_html = output_html.replace('resetZoom', 'resetPreviewZoom') |
|
|
|
number = _download.random_number() |
|
|
|
return ( |
|
gr.Textbox.update(value=output_html, placeholder=number), |
|
) |
|
|
|
def is_image_url(url): |
|
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] |
|
parsed = urlparse(url) |
|
path = parsed.path |
|
return any(path.endswith(ext) for ext in image_extensions) |
|
|
|
def clean_description(desc): |
|
try: |
|
soup = BeautifulSoup(desc, 'html.parser') |
|
for a in soup.find_all('a', href=True): |
|
link_text = a.text + ' ' + a['href'] |
|
if not is_image_url(a['href']): |
|
a.replace_with(link_text) |
|
|
|
cleaned_text = soup.get_text() |
|
except ImportError: |
|
print("Python module 'BeautifulSoup' was not imported correctly, cannot clean description. Please try to restart or install it manually.") |
|
cleaned_text = desc |
|
return cleaned_text |
|
|
|
def make_dir(path): |
|
try: |
|
if not os.path.exists(path): |
|
os.makedirs(path) |
|
except OSError as e: |
|
if e.errno == errno.EACCES: |
|
try: |
|
os.makedirs(path, mode=0o777) |
|
except OSError as e: |
|
if e.errno == errno.EACCES: |
|
print("Permission denied even with elevated permissions.") |
|
else: |
|
print(f"Error creating directory: {e}") |
|
else: |
|
print(f"Error creating directory: {e}") |
|
except Exception as e: |
|
print(f"Error creating directory: {e}") |
|
|
|
def save_model_info(install_path, file_name, sub_folder, sha256=None, preview_html=None, overwrite_toggle=False, api_response=None): |
|
filename = os.path.splitext(file_name)[0] |
|
json_file = os.path.join(install_path, f'{filename}.json') |
|
make_dir(install_path) |
|
|
|
save_api_info = getattr(opts, "save_api_info", False) |
|
use_local = getattr(opts, "local_path_in_html", False) |
|
save_to_custom = getattr(opts, "save_to_custom", False) |
|
|
|
if not api_response: |
|
api_response = gl.json_data |
|
|
|
image_path = get_image_path(install_path, api_response, sub_folder) |
|
|
|
if save_to_custom: |
|
save_path = image_path |
|
else: |
|
save_path = install_path |
|
|
|
result = find_and_save(api_response, sha256, file_name, json_file, False, overwrite_toggle) |
|
if result != "found": |
|
result = find_and_save(api_response, sha256, file_name, json_file, True, overwrite_toggle) |
|
|
|
if preview_html: |
|
if use_local: |
|
img_urls = re.findall(r'data-sampleimg="true" src=[\'"]?([^\'" >]+)', preview_html) |
|
for i, img_url in enumerate(img_urls): |
|
img_name = f'{filename}_{i}.png' |
|
preview_html = preview_html.replace(img_url,f'{os.path.join(image_path, img_name)}') |
|
|
|
match = re.search(r'(\s*)<div class="model-block">', preview_html) |
|
if match: |
|
indentation = match.group(1) |
|
else: |
|
indentation = '' |
|
css_link = f'<link rel="stylesheet" type="text/css" href="{css_path}">' |
|
utf8_meta_tag = f'{indentation}<meta charset="UTF-8">' |
|
head_section = f'{indentation}<head>{indentation} {utf8_meta_tag}{indentation} {css_link}{indentation}</head>' |
|
HTML = head_section + preview_html |
|
path_to_new_file = os.path.join(save_path, f'{filename}.html') |
|
with open(path_to_new_file, 'wb') as f: |
|
f.write(HTML.encode('utf8')) |
|
|
|
if save_api_info: |
|
path_to_new_file = os.path.join(save_path, f'{filename}.api_info.json') |
|
with open(path_to_new_file, mode="w", encoding="utf-8") as f: |
|
json.dump(gl.json_info, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
def find_and_save(api_response, sha256=None, file_name=None, json_file=None, no_hash=None, overwrite_toggle=None): |
|
for item in api_response.get('items', []): |
|
for model_version in item.get('modelVersions', []): |
|
for file in model_version.get('files', []): |
|
file_name_api = file.get('name', '') |
|
sha256_api = file.get('hashes', {}).get('SHA256', '') |
|
|
|
if file_name == file_name_api if no_hash else sha256 == sha256_api: |
|
gl.json_info = item |
|
trained_words = model_version.get('trainedWords', []) |
|
model_id = model_version.get('modelId', '') |
|
|
|
if model_id: |
|
model_url = f'Model URL: \"https://civitai.com/models/{model_id}\"\n' |
|
|
|
description = item.get('description', '') |
|
if description != None: |
|
description = clean_description(description) |
|
description = model_url + description |
|
|
|
base_model = model_version.get('baseModel', '') |
|
|
|
if base_model.startswith("SD 1"): |
|
base_model = "SD1" |
|
elif base_model.startswith("SD 2"): |
|
base_model = "SD2" |
|
elif base_model.startswith("SDXL"): |
|
base_model = "SDXL" |
|
else: |
|
base_model = "Other" |
|
|
|
if isinstance(trained_words, list): |
|
trained_tags = ",".join(trained_words) |
|
trained_tags = re.sub(r'<[^>]*:[^>]*>', '', trained_tags) |
|
trained_tags = re.sub(r', ?', ', ', trained_tags) |
|
trained_tags = trained_tags.strip(', ') |
|
else: |
|
trained_tags = trained_words |
|
|
|
if os.path.exists(json_file): |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
try: |
|
content = json.load(f) |
|
except: |
|
content = {} |
|
else: |
|
content = {} |
|
changed = False |
|
if not overwrite_toggle: |
|
if "activation text" not in content or not content["activation text"]: |
|
content["activation text"] = trained_tags |
|
changed = True |
|
if "description" not in content or not content["description"]: |
|
content["description"] = description |
|
changed = True |
|
if "sd version" not in content or not content["sd version"]: |
|
content["sd version"] = base_model |
|
changed = True |
|
else: |
|
content["activation text"] = trained_tags |
|
content["description"] = description |
|
content["sd version"] = base_model |
|
changed = True |
|
|
|
with open(json_file, 'w', encoding="utf-8") as f: |
|
json.dump(content, f, indent=4) |
|
|
|
if changed: print(f"Model info saved to \"{json_file}\"") |
|
return "found" |
|
|
|
return "not found" |
|
|
|
def get_models(file_path, gen_hash=None): |
|
modelId = None |
|
sha256 = None |
|
json_file = os.path.splitext(file_path)[0] + ".json" |
|
if os.path.exists(json_file): |
|
try: |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
if 'modelId' in data: |
|
modelId = data['modelId'] |
|
if 'sha256' in data and data['sha256']: |
|
sha256 = data['sha256'] |
|
except Exception as e: |
|
print(f"Failed to open {json_file}: {e}") |
|
|
|
if not modelId or not sha256 or modelId == "Model not found": |
|
if gen_hash: |
|
if not sha256: |
|
sha256 = gen_sha256(file_path) |
|
by_hash = f"https://civitai.com/api/v1/model-versions/by-hash/{sha256}" |
|
else: |
|
if modelId: |
|
return modelId |
|
else: |
|
return None |
|
|
|
try: |
|
if not modelId or modelId == "Model not found": |
|
response = requests.get(by_hash, timeout=(10,30)) |
|
if response.status_code == 200: |
|
api_response = response.json() |
|
if 'error' in api_response: |
|
print(f"\"{file_path}\": {api_response['error']}") |
|
return None |
|
else: |
|
modelId = api_response.get("modelId", "") |
|
elif response.status_code == 503: |
|
return "offline" |
|
elif response.status_code == 404: |
|
api_response = response.json() |
|
modelId = api_response.get("error", "") |
|
|
|
if os.path.exists(json_file): |
|
try: |
|
with open(json_file, 'r', encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
data['modelId'] = modelId |
|
data['sha256'] = sha256.upper() |
|
|
|
with open(json_file, 'w', encoding="utf-8") as f: |
|
json.dump(data, f, indent=4) |
|
except Exception as e: |
|
print(f"Failed to open {json_file}: {e}") |
|
else: |
|
data = { |
|
'modelId': modelId, |
|
'sha256': sha256.upper() |
|
} |
|
with open(json_file, 'w', encoding="utf-8") as f: |
|
json.dump(data, f, indent=4) |
|
|
|
return modelId |
|
except requests.exceptions.Timeout: |
|
print(f"Request timed out for {file_path}. Skipping...") |
|
return "offline" |
|
except requests.exceptions.ConnectionError: |
|
print("Failed to connect to the API. The CivitAI servers might be offline.") |
|
return "offline" |
|
except Exception as e: |
|
print(f"An error occurred for {file_path}: {str(e)}") |
|
return None |
|
|
|
def version_match(file_paths, api_response): |
|
updated_models = [] |
|
outdated_models = [] |
|
sha256_hashes = {} |
|
for file_path in file_paths: |
|
json_path = f"{os.path.splitext(file_path)[0]}.json" |
|
if os.path.exists(json_path): |
|
with open(json_path, 'r', encoding="utf-8") as f: |
|
try: |
|
json_data = json.load(f) |
|
sha256 = json_data.get('sha256') |
|
if sha256: |
|
sha256_hashes[os.path.basename(file_path)] = sha256.upper() |
|
except Exception as e: |
|
print(f"Failed to open {json_path}: {e}") |
|
|
|
file_names_and_hashes = set() |
|
for file_path in file_paths: |
|
file_name = os.path.basename(file_path) |
|
file_name_without_ext = os.path.splitext(file_name)[0] |
|
file_sha256 = sha256_hashes.get(file_name, "") |
|
if file_sha256: file_sha256 = file_sha256.upper() |
|
file_names_and_hashes.add((file_name_without_ext, file_sha256)) |
|
|
|
for item in api_response.get('items', []): |
|
model_versions = item.get('modelVersions', []) |
|
|
|
if not model_versions: |
|
continue |
|
|
|
for idx, model_version in enumerate(model_versions): |
|
files = model_version.get('files', []) |
|
match_found = False |
|
for file_entry in files: |
|
entry_name = os.path.splitext(file_entry.get('name', ''))[0] |
|
entry_sha256 = file_entry.get('hashes', {}).get('SHA256', "") |
|
if entry_sha256: entry_sha256 = entry_sha256.upper() |
|
|
|
if (entry_name, entry_sha256) in file_names_and_hashes: |
|
match_found = True |
|
break |
|
|
|
if match_found: |
|
if idx == 0: |
|
updated_models.append((f"&ids={item['id']}", item["name"])) |
|
else: |
|
outdated_models.append((f"&ids={item['id']}", item["name"])) |
|
break |
|
|
|
return updated_models, outdated_models |
|
|
|
def get_content_choices(scan_choices=False): |
|
use_LORA = getattr(opts, "use_LORA", False) |
|
if use_LORA: |
|
content_list = ["Checkpoint", "TextualInversion", "LORA & LoCon", "Poses", "Controlnet", "Hypernetwork", "AestheticGradient", "VAE", "Upscaler", "MotionModule", "Wildcards", "Workflows", "Other"] |
|
else: |
|
content_list = ["Checkpoint", "TextualInversion", "LORA", "LoCon", "Poses", "Controlnet", "Hypernetwork", "AestheticGradient", "VAE", "Upscaler", "MotionModule", "Wildcards", "Workflows", "Other"] |
|
if scan_choices: |
|
content_list.insert(0, 'All') |
|
return content_list |
|
return content_list |
|
|
|
def file_scan(folders, ver_finish, tag_finish, installed_finish, preview_finish, overwrite_toggle, tile_count, gen_hash, progress=gr.Progress() if queue else None): |
|
global from_ver, from_installed, no_update |
|
update_log = getattr(opts, "update_log", True) |
|
gl.scan_files = True |
|
no_update = False |
|
if from_ver: |
|
number = _download.random_number(ver_finish) |
|
elif from_tag: |
|
number = _download.random_number(tag_finish) |
|
elif from_installed: |
|
number = _download.random_number(installed_finish) |
|
elif from_preview: |
|
number = _download.random_number(preview_finish) |
|
|
|
if not folders: |
|
if progress != None: |
|
progress(0, desc=f"No folder selected.") |
|
no_update = True |
|
gl.scan_files = False |
|
from_ver, from_installed = False, False |
|
time.sleep(2) |
|
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number)) |
|
|
|
folders_to_check = [] |
|
if 'All' in folders: |
|
folders = _file.get_content_choices() |
|
|
|
for item in folders: |
|
if item == "LORA & LoCon": |
|
folder = _api.contenttype_folder("LORA") |
|
if folder: |
|
folders_to_check.append(folder) |
|
folder = _api.contenttype_folder("LoCon", fromCheck=True) |
|
if folder: |
|
folders_to_check.append(folder) |
|
elif item == "Upscaler": |
|
folder = _api.contenttype_folder(item, "SwinIR") |
|
if folder: |
|
folders_to_check.append(folder) |
|
folder = _api.contenttype_folder(item, "RealESRGAN") |
|
if folder: |
|
folders_to_check.append(folder) |
|
folder = _api.contenttype_folder(item, "GFPGAN") |
|
if folder: |
|
folders_to_check.append(folder) |
|
folder = _api.contenttype_folder(item, "BSRGAN") |
|
if folder: |
|
folders_to_check.append(folder) |
|
folder = _api.contenttype_folder(item, "ESRGAN") |
|
if folder: |
|
folders_to_check.append(folder) |
|
else: |
|
folder = _api.contenttype_folder(item) |
|
if folder: |
|
folders_to_check.append(folder) |
|
|
|
total_files = 0 |
|
files_done = 0 |
|
|
|
files = list_files(folders_to_check) |
|
total_files += len(files) |
|
|
|
if total_files == 0: |
|
if progress != None: |
|
progress(1, desc=f"No files in selected folder.") |
|
no_update = True |
|
gl.scan_files = False |
|
from_ver, from_installed = False, False |
|
time.sleep(2) |
|
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number)) |
|
|
|
updated_models = [] |
|
outdated_models = [] |
|
all_model_ids = [] |
|
file_paths = [] |
|
all_ids = [] |
|
|
|
for file_path in files: |
|
if gl.cancel_status: |
|
if progress != None: |
|
progress(files_done / total_files, desc=f"Processing files cancelled.") |
|
no_update = True |
|
gl.scan_files = False |
|
from_ver, from_installed = False, False |
|
time.sleep(2) |
|
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number)) |
|
file_name = os.path.basename(file_path) |
|
if progress != None: |
|
progress(files_done / total_files, desc=f"Processing file: {file_name}") |
|
|
|
model_id = get_models(file_path, gen_hash) |
|
if model_id == "offline": |
|
print("The CivitAI servers did not respond, unable to retrieve Model ID") |
|
elif model_id == "Model not found" and update_log: |
|
print(f"model: \"{file_name}\" not found on CivitAI servers.") |
|
elif model_id != None: |
|
all_model_ids.append(f"&ids={model_id}") |
|
all_ids.append(model_id) |
|
file_paths.append(file_path) |
|
elif not model_id and update_log: |
|
print(f"model ID not found for: \"{file_name}\"") |
|
files_done += 1 |
|
|
|
all_items = [] |
|
|
|
all_model_ids = list(set(all_model_ids)) |
|
|
|
if not all_model_ids: |
|
progress(1, desc=f"No model IDs could be retrieved.") |
|
print("Could not retrieve any Model IDs, please make sure to turn on the \"One-Time Hash Generation for externally downloaded models.\" option if you haven't already.") |
|
no_update = True |
|
gl.scan_files = False |
|
from_ver, from_installed = False, False |
|
time.sleep(2) |
|
return (gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number)) |
|
|
|
def chunks(lst, n): |
|
for i in range(0, len(lst), n): |
|
yield lst[i:i + n] |
|
|
|
if not from_installed: |
|
model_chunks = list(chunks(all_model_ids, 500)) |
|
|
|
base_url = "https://civitai.com/api/v1/models?limit=100" |
|
url_list = [f"{base_url}{''.join(chunk)}" for chunk in model_chunks] |
|
|
|
url_count = len(all_model_ids) // 100 |
|
if len(all_model_ids) % 100 != 0: |
|
url_count += 1 |
|
url_done = 0 |
|
api_response = {} |
|
for url in url_list: |
|
while url: |
|
try: |
|
if progress is not None: |
|
progress(url_done / url_count, desc=f"Sending API request... {url_done}/{url_count}") |
|
response = requests.get(url, timeout=(10, 30)) |
|
if response.status_code == 200: |
|
api_response_json = response.json() |
|
|
|
all_items.extend(api_response_json['items']) |
|
metadata = api_response_json.get('metadata', {}) |
|
url = metadata.get('nextPage', None) |
|
elif response.status_code == 503: |
|
return ( |
|
gr.HTML.update(value=offlineHTML), |
|
gr.Textbox.update(value=number) |
|
) |
|
else: |
|
print(f"Error: Received status code {response.status_code} with URL: {url}") |
|
url = None |
|
url_done += 1 |
|
except requests.exceptions.Timeout: |
|
print(f"Request timed out for {url}. Skipping...") |
|
url = None |
|
except requests.exceptions.ConnectionError: |
|
print("Failed to connect to the API. The servers might be offline.") |
|
url = None |
|
except Exception as e: |
|
print(f"An unexpected error occurred: {e}") |
|
url = None |
|
|
|
api_response['items'] = all_items |
|
if api_response['items'] == []: |
|
return ( |
|
gr.HTML.update(value=offlineHTML), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
if progress != None: |
|
progress(1, desc="Processing final results...") |
|
|
|
if from_ver: |
|
updated_models, outdated_models = version_match(file_paths, api_response) |
|
|
|
updated_set = set(updated_models) |
|
outdated_set = set(outdated_models) |
|
outdated_set = {model for model in outdated_set if model[0] not in {updated_model[0] for updated_model in updated_set}} |
|
|
|
all_model_ids = [model[0] for model in outdated_set] |
|
all_model_names = [model[1] for model in outdated_set] |
|
|
|
if update_log: |
|
for model_name in all_model_names: |
|
print(f'"{model_name}" is currently outdated.') |
|
|
|
if len(all_model_ids) == 0: |
|
no_update = True |
|
gl.scan_files = False |
|
from_ver = False |
|
return ( |
|
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">No updates found for selected models.</div>'), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
model_chunks = list(chunks(all_model_ids, tile_count)) |
|
|
|
base_url = "https://civitai.com/api/v1/models?limit=100" |
|
gl.url_list_with_numbers = {i+1: f"{base_url}{''.join(chunk)}" for i, chunk in enumerate(model_chunks)} |
|
|
|
url_error = False |
|
api_url = gl.url_list_with_numbers.get(1) |
|
|
|
if not url_error: |
|
response = requests.get(api_url, timeout=(10,30)) |
|
try: |
|
if response.status_code == 200: |
|
response.encoding = "utf-8" |
|
gl.ver_json = json.loads(response.text) |
|
|
|
highest_number = max(gl.url_list_with_numbers.keys()) |
|
gl.ver_json["metadata"]["totalPages"] = highest_number |
|
|
|
if highest_number > 1: |
|
gl.ver_json["metadata"]["nextPage"] = gl.url_list_with_numbers.get(2) |
|
else: |
|
print(f"Error: Received status code {response.status_code} for URL: {url}") |
|
url_error = True |
|
except requests.exceptions.Timeout: |
|
print(f"Request timed out for {url}. Skipping...") |
|
url_error = True |
|
except requests.exceptions.ConnectionError: |
|
print("Failed to connect to the API. The servers might be offline.") |
|
url_error = True |
|
except Exception as e: |
|
print(f"An unexpected error occurred: {e}") |
|
url_error = True |
|
|
|
if url_error: |
|
gl.scan_files = False |
|
return ( |
|
gr.HTML.update(value=offlineHTML), |
|
gr.Textbox.update(value=number) |
|
) |
|
elif from_ver: |
|
gl.scan_files = False |
|
return ( |
|
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">Outdated models have been found.<br>Please press the button above to load the models into the browser tab</div>'), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
elif from_installed: |
|
gl.scan_files = False |
|
return ( |
|
gr.HTML.update(value='<div style="font-size: 24px; text-align: center; margin: 50px !important;">Installed models have been loaded.<br>Please press the button above to load the models into the browser tab</div>'), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
elif from_tag: |
|
for file_path, id_value in zip(file_paths, all_ids): |
|
install_path, file_name = os.path.split(file_path) |
|
model_versions = _api.update_model_versions(id_value, api_response) |
|
preview_html = _api.update_model_info(None, model_versions.get('value'), True, id_value, api_response) |
|
sub_folder = os.path.normpath(os.path.relpath(install_path, gl.main_folder)) |
|
save_model_info(install_path, file_name, sub_folder, preview_html=preview_html, api_response=api_response, overwrite_toggle=overwrite_toggle) |
|
if progress != None: |
|
progress(1, desc=f"All tags succesfully saved!") |
|
gl.scan_files = False |
|
time.sleep(2) |
|
return ( |
|
gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
elif from_preview: |
|
completed_preview = 0 |
|
preview_count = len(file_paths) |
|
for file in file_paths: |
|
_, file_name = os.path.split(file) |
|
name = os.path.splitext(file_name)[0] |
|
if progress != None: |
|
progress(completed_preview / preview_count, desc=f"Saving preview images... {completed_preview}/{preview_count} | {name}") |
|
save_preview(file, api_response, overwrite_toggle) |
|
completed_preview += 1 |
|
gl.scan_files = False |
|
return ( |
|
gr.HTML.update(value='<div style="min-height: 0px;"></div>'), |
|
gr.Textbox.update(value=number) |
|
) |
|
|
|
def save_tag_start(tag_start): |
|
global from_tag, from_ver, from_installed, from_preview |
|
from_tag, from_ver, from_installed, from_preview = True, False, False, False |
|
number = _download.random_number(tag_start) |
|
return ( |
|
gr.Textbox.update(value=number), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
|
) |
|
|
|
def save_preview_start(preview_start): |
|
global from_tag, from_ver, from_installed, from_preview |
|
from_preview, from_tag, from_ver, from_installed = True, False, False, False |
|
number = _download.random_number(preview_start) |
|
return ( |
|
gr.Textbox.update(value=number), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
|
) |
|
|
|
def installed_models_start(installed_start): |
|
global from_installed, from_ver, from_tag, from_preview |
|
from_installed, from_ver, from_tag, from_preview = True, False, False, False |
|
number = _download.random_number(installed_start) |
|
return ( |
|
gr.Textbox.update(value=number), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
|
) |
|
|
|
def ver_search_start(ver_start): |
|
global from_ver, from_tag, from_installed, from_preview |
|
from_ver, from_tag, from_installed, from_preview = True, False, False, False |
|
number = _download.random_number(ver_start) |
|
return ( |
|
gr.Textbox.update(value=number), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.Button.update(interactive=False, visible=True), |
|
gr.HTML.update(value='<div style="min-height: 100px;"></div>') |
|
) |
|
|
|
def save_tag_finish(): |
|
global from_tag |
|
from_tag = False |
|
return ( |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=False) |
|
) |
|
|
|
def save_preview_finish(): |
|
global from_preview |
|
from_preview = False |
|
return ( |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=False) |
|
) |
|
|
|
def scan_finish(): |
|
return ( |
|
gr.Button.update(interactive=no_update, visible=no_update), |
|
gr.Button.update(interactive=no_update, visible=no_update), |
|
gr.Button.update(interactive=no_update, visible=no_update), |
|
gr.Button.update(interactive=no_update, visible=no_update), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=not no_update, visible=not no_update) |
|
) |
|
|
|
def load_to_browser(content_type, sort_type, period_type, use_search_term, search_term, tile_count, base_filter, nsfw): |
|
global from_ver, from_installed |
|
if from_ver: |
|
model_list_return = _api.update_model_list(from_ver=True, tile_count=tile_count) |
|
if from_installed: |
|
model_list_return = _api.update_model_list(from_installed=True, tile_count=tile_count) |
|
|
|
use_LORA = getattr(opts, "use_LORA", False) |
|
if content_type: |
|
if use_LORA and 'LORA & LoCon' in content_type: |
|
content_type.remove('LORA & LoCon') |
|
if 'LORA' not in content_type: |
|
content_type.append('LORA') |
|
if 'LoCon' not in content_type: |
|
content_type.append('LoCon') |
|
|
|
current_inputs = (content_type, sort_type, period_type, use_search_term, search_term, tile_count, base_filter, nsfw) |
|
gl.previous_inputs = current_inputs |
|
|
|
gl.file_scan = True |
|
from_ver, from_installed = False, False |
|
return ( |
|
*model_list_return, |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=True, visible=True), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.Button.update(interactive=False, visible=False), |
|
gr.HTML.update(value='<div style="min-height: 0px;"></div>') |
|
) |
|
|
|
def cancel_scan(): |
|
gl.cancel_status = True |
|
|
|
while True: |
|
if not gl.scan_files: |
|
gl.cancel_status = False |
|
return |
|
else: |
|
time.sleep(0.5) |
|
continue |
|
|