Spaces:
Running
on
Zero
Running
on
Zero
import spaces | |
import os | |
import gradio as gr | |
import json | |
import logging | |
logging.getLogger("diffusers").setLevel(logging.ERROR) | |
import diffusers | |
diffusers.utils.logging.set_verbosity(40) | |
import warnings | |
warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers") | |
warnings.filterwarnings(action="ignore", category=UserWarning, module="diffusers") | |
warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers") | |
from pathlib import Path | |
from huggingface_hub import HfApi | |
from env import (HF_TOKEN, hf_read_token, # to use only for private repos | |
CIVITAI_API_KEY, HF_LORA_PRIVATE_REPOS1, HF_LORA_PRIVATE_REPOS2, HF_LORA_ESSENTIAL_PRIVATE_REPO, | |
HF_VAE_PRIVATE_REPO, directory_models, directory_loras, directory_vaes, | |
download_model_list, download_lora_list, download_vae_list) | |
from modutils import (to_list, list_uniq, list_sub, get_lora_model_list, download_private_repo, | |
safe_float, escape_lora_basename, to_lora_key, to_lora_path, get_local_model_list, | |
get_private_lora_model_lists, get_valid_lora_name, get_valid_lora_path, get_valid_lora_wt, | |
get_lora_info, normalize_prompt_list, get_civitai_info, search_lora_on_civitai, MODEL_TYPE_DICT) | |
def download_things(directory, url, hf_token="", civitai_api_key=""): | |
url = url.strip() | |
if "drive.google.com" in url: | |
original_dir = os.getcwd() | |
os.chdir(directory) | |
os.system(f"gdown --fuzzy {url}") | |
os.chdir(original_dir) | |
elif "huggingface.co" in url: | |
url = url.replace("?download=true", "") | |
# url = urllib.parse.quote(url, safe=':/') # fix encoding | |
if "/blob/" in url: | |
url = url.replace("/blob/", "/resolve/") | |
user_header = f'"Authorization: Bearer {hf_token}"' | |
if hf_token: | |
os.system(f"aria2c --console-log-level=error --summary-interval=10 --header={user_header} -c -x 16 -k 1M -s 16 {url} -d {directory} -o {url.split('/')[-1]}") | |
else: | |
os.system(f"aria2c --optimize-concurrent-downloads --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 {url} -d {directory} -o {url.split('/')[-1]}") | |
elif "civitai.com" in url: | |
if "?" in url: | |
url = url.split("?")[0] | |
if civitai_api_key: | |
url = url + f"?token={civitai_api_key}" | |
os.system(f"aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {directory} {url}") | |
else: | |
print("\033[91mYou need an API key to download Civitai models.\033[0m") | |
else: | |
os.system(f"aria2c --console-log-level=error --summary-interval=10 -c -x 16 -k 1M -s 16 -d {directory} {url}") | |
def get_model_list(directory_path): | |
model_list = [] | |
valid_extensions = {'.ckpt' , '.pt', '.pth', '.safetensors', '.bin'} | |
for filename in os.listdir(directory_path): | |
if os.path.splitext(filename)[1] in valid_extensions: | |
name_without_extension = os.path.splitext(filename)[0] | |
file_path = os.path.join(directory_path, filename) | |
# model_list.append((name_without_extension, file_path)) | |
model_list.append(file_path) | |
print('\033[34mFILE: ' + file_path + '\033[0m') | |
return model_list | |
# - **Download Models** | |
download_model = ", ".join(download_model_list) | |
# - **Download VAEs** | |
download_vae = ", ".join(download_vae_list) | |
# - **Download LoRAs** | |
download_lora = ", ".join(download_lora_list) | |
#download_private_repo(HF_LORA_ESSENTIAL_PRIVATE_REPO, directory_loras, True) | |
#download_private_repo(HF_VAE_PRIVATE_REPO, directory_vaes, False) | |
CIVITAI_API_KEY = os.environ.get("CIVITAI_API_KEY") | |
hf_token = os.environ.get("HF_TOKEN") | |
# Download stuffs | |
for url in [url.strip() for url in download_model.split(',')]: | |
if not os.path.exists(f"./models/{url.split('/')[-1]}"): | |
download_things(directory_models, url, hf_token, CIVITAI_API_KEY) | |
for url in [url.strip() for url in download_vae.split(',')]: | |
if not os.path.exists(f"./vaes/{url.split('/')[-1]}"): | |
download_things(directory_vaes, url, hf_token, CIVITAI_API_KEY) | |
for url in [url.strip() for url in download_lora.split(',')]: | |
if not os.path.exists(f"./loras/{url.split('/')[-1]}"): | |
download_things(directory_loras, url, hf_token, CIVITAI_API_KEY) | |
lora_model_list = get_lora_model_list() | |
vae_model_list = get_model_list(directory_vaes) | |
vae_model_list.insert(0, "None") | |
def is_repo_name(s): | |
import re | |
return re.fullmatch(r'^[^/]+?/[^/]+?$', s) | |
def get_t2i_model_info(repo_id: str): | |
api = HfApi(token=HF_TOKEN) | |
try: | |
if not is_repo_name(repo_id): return "" | |
model = api.model_info(repo_id=repo_id, timeout=5.0) | |
except Exception as e: | |
print(f"Error: Failed to get {repo_id}'s info.") | |
print(e) | |
return "" | |
if model.private or model.gated: return "" | |
tags = model.tags | |
info = [] | |
url = f"https://huggingface.co/{repo_id}/" | |
if not 'diffusers' in tags: return "" | |
for k, v in MODEL_TYPE_DICT.items(): | |
if k in tags: info.append(v) | |
if model.card_data and model.card_data.tags: | |
info.extend(list_sub(model.card_data.tags, ['text-to-image', 'stable-diffusion', 'stable-diffusion-api', 'safetensors', 'stable-diffusion-xl'])) | |
info.append(f"DLs: {model.downloads}") | |
info.append(f"likes: {model.likes}") | |
info.append(model.last_modified.strftime("lastmod: %Y-%m-%d")) | |
md = f"Model Info: {', '.join(info)}, [Model Repo]({url})" | |
return gr.update(value=md) | |
private_lora_dict = {"": ["", "", "", "", ""]} | |
try: | |
with open('lora_dict.json', encoding='utf-8') as f: | |
d = json.load(f) | |
for k, v in d.items(): | |
private_lora_dict[escape_lora_basename(k)] = v | |
except Exception: | |
pass | |
private_lora_model_list = get_private_lora_model_lists() | |
loras_dict = {"None": ["", "", "", "", ""], "": ["", "", "", "", ""]} | private_lora_dict.copy() | |
loras_url_to_path_dict = {} # {"URL to download": "local filepath", ...} | |
civitai_lora_last_results = {} # {"URL to download": {search results}, ...} | |
all_lora_list = [] | |
def get_all_lora_list(): | |
global all_lora_list | |
loras = get_lora_model_list() | |
all_lora_list = loras.copy() | |
return loras | |
def get_all_lora_tupled_list(): | |
global loras_dict | |
models = get_all_lora_list() | |
if not models: return [] | |
tupled_list = [] | |
for model in models: | |
#if not model: continue # to avoid GUI-related bug | |
basename = Path(model).stem | |
key = to_lora_key(model) | |
items = None | |
if key in loras_dict.keys(): | |
items = loras_dict.get(key, None) | |
else: | |
items = get_civitai_info(model) | |
if items != None: | |
loras_dict[key] = items | |
name = basename | |
value = model | |
if items and items[2] != "": | |
if items[1] == "Pony": | |
name = f"{basename} (for {items[1]}🐴, {items[2]})" | |
else: | |
name = f"{basename} (for {items[1]}, {items[2]})" | |
tupled_list.append((name, value)) | |
return tupled_list | |
def update_lora_dict(path: str): | |
global loras_dict | |
key = to_lora_key(path) | |
if key in loras_dict.keys(): return | |
items = get_civitai_info(path) | |
if items == None: return | |
loras_dict[key] = items | |
def download_lora(dl_urls: str): | |
global loras_url_to_path_dict | |
dl_path = "" | |
before = get_local_model_list(directory_loras) | |
urls = [] | |
for url in [url.strip() for url in dl_urls.split(',')]: | |
local_path = f"{directory_loras}/{url.split('/')[-1]}" | |
if not Path(local_path).exists(): | |
download_things(directory_loras, url, hf_token, CIVITAI_API_KEY) | |
urls.append(url) | |
after = get_local_model_list(directory_loras) | |
new_files = list_sub(after, before) | |
for i, file in enumerate(new_files): | |
path = Path(file) | |
if path.exists(): | |
new_path = Path(f'{path.parent.name}/{escape_lora_basename(path.stem)}{path.suffix}') | |
path.resolve().rename(new_path.resolve()) | |
loras_url_to_path_dict[urls[i]] = str(new_path) | |
update_lora_dict(str(new_path)) | |
dl_path = str(new_path) | |
return dl_path | |
def copy_lora(path: str, new_path: str): | |
import shutil | |
if path == new_path: return new_path | |
cpath = Path(path) | |
npath = Path(new_path) | |
if cpath.exists(): | |
try: | |
shutil.copy(str(cpath.resolve()), str(npath.resolve())) | |
except Exception: | |
return None | |
update_lora_dict(str(npath)) | |
return new_path | |
else: | |
return None | |
def download_my_lora(dl_urls: str, lora): | |
path = download_lora(dl_urls) | |
if path: lora = path | |
choices = get_all_lora_tupled_list() | |
return gr.update(value=lora, choices=choices) | |
def apply_lora_prompt(lora_info: str): | |
if lora_info == "None": return "" | |
lora_tag = lora_info.replace("/",",") | |
lora_tags = lora_tag.split(",") if str(lora_info) != "None" else [] | |
lora_prompts = normalize_prompt_list(lora_tags) | |
prompt = ", ".join(list_uniq(lora_prompts)) | |
return prompt | |
def update_loras(prompt, lora, lora_wt): | |
on, label, tag, md = get_lora_info(lora) | |
choices = get_all_lora_tupled_list() | |
return gr.update(value=prompt), gr.update(value=lora, choices=choices), gr.update(value=lora_wt),\ | |
gr.update(value=tag, label=label, visible=on), gr.update(value=md, visible=on) | |
def search_civitai_lora(query, base_model, sort="Highest Rated", period="AllTime", tag=""): | |
global civitai_lora_last_results | |
items = search_lora_on_civitai(query, base_model, 100, sort, period, tag) | |
if not items: return gr.update(choices=[("", "")], value="", visible=False),\ | |
gr.update(value="", visible=False), gr.update(visible=True), gr.update(visible=True) | |
civitai_lora_last_results = {} | |
choices = [] | |
for item in items: | |
base_model_name = "Pony🐴" if item['base_model'] == "Pony" else item['base_model'] | |
name = f"{item['name']} (for {base_model_name} / By: {item['creator']} / Tags: {', '.join(item['tags'])})" | |
value = item['dl_url'] | |
choices.append((name, value)) | |
civitai_lora_last_results[value] = item | |
if not choices: return gr.update(choices=[("", "")], value="", visible=False),\ | |
gr.update(value="", visible=False), gr.update(visible=True), gr.update(visible=True) | |
result = civitai_lora_last_results.get(choices[0][1], "None") | |
md = result['md'] if result else "" | |
return gr.update(choices=choices, value=choices[0][1], visible=True), gr.update(value=md, visible=True),\ | |
gr.update(visible=True), gr.update(visible=True) | |
def select_civitai_lora(search_result): | |
if not "http" in search_result: return gr.update(value=""), gr.update(value="None", visible=True) | |
result = civitai_lora_last_results.get(search_result, "None") | |
md = result['md'] if result else "" | |
return gr.update(value=search_result), gr.update(value=md, visible=True) | |
def search_civitai_lora_json(query, base_model): | |
results = {} | |
items = search_lora_on_civitai(query, base_model) | |
if not items: return gr.update(value=results) | |
for item in items: | |
results[item['dl_url']] = item | |
return gr.update(value=results) | |