# import os import subprocess import importlib.util import sys,json import urllib import hashlib import datetime import folder_paths import logging import base64,io,re from PIL import Image from comfy.cli_args import args python = sys.executable #修复 sys.stdout.isatty() object has no attribute 'isatty' try: sys.stdout.isatty() except: print('#fix sys.stdout.isatty') sys.stdout.isatty = lambda: False llama_port=None llama_model="" llama_chat_format="" try: from .nodes.ChatGPT import get_llama_models,get_llama_model_path,llama_cpp_client llama_cpp_client("") except: print("##nodes.ChatGPT ImportError") from .nodes.RembgNode import get_rembg_models,U2NET_HOME,run_briarmbg,run_rembg from server import PromptServer try: import aiohttp from aiohttp import web except ImportError: print("Module 'aiohttp' not installed. Please install it via:") print("pip install aiohttp") print("or") print("pip install -r requirements.txt") sys.exit() def is_installed(package, package_overwrite=None): try: spec = importlib.util.find_spec(package) except ModuleNotFoundError: pass package = package_overwrite or package if spec is None: print(f"Installing {package}...") # 清华源 -i https://pypi.tuna.tsinghua.edu.cn/simple command = f'"{python}" -m pip install {package}' result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env=os.environ) if result.returncode != 0: print(f"Couldn't install\nCommand: {command}\nError code: {result.returncode}") else: print(package+'## OK') try: import OpenSSL except ImportError: print("Module 'pyOpenSSL' not installed. Please install it via:") print("pip install pyOpenSSL") print("or") print("pip install -r requirements.txt") is_installed('pyOpenSSL') sys.exit() try: import watchdog except ImportError: print("Module 'watchdog' not installed. Please install it via:") print("pip install watchdog") print("or") print("pip install -r requirements.txt") is_installed('watchdog') sys.exit() def install_openai(): # Helper function to install the OpenAI module if not already installed try: importlib.import_module('openai') except ImportError: import pip pip.main(['install', 'openai']) install_openai() current_path = os.path.abspath(os.path.dirname(__file__)) def remove_base64_prefix(base64_str): """ 去除 base64 字符串中的 data:image/*;base64, 前缀 Args: base64_str: base64 编码的字符串 Returns: 去除前缀后的 base64 字符串 """ # 使用正则表达式匹配常见的前缀 pattern = r'^data:image\/(.*);base64,(.+)$' match = re.match(pattern, base64_str) if match: # 如果匹配到常见的前缀,则去除前缀并返回 return match.group(2) else: # 如果不匹配到常见的前缀,则直接返回 return base64_str def calculate_md5(string): encoded_string = string.encode() md5_hash = hashlib.md5(encoded_string).hexdigest() return md5_hash def create_key(key_p,crt_p): import OpenSSL # 生成自签名证书 # 生成私钥 private_key = OpenSSL.crypto.PKey() private_key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) # 生成CSR csr = OpenSSL.crypto.X509Req() csr.get_subject().CN = "mixlab.com" # 设置证书的通用名称 csr.set_pubkey(private_key) csr.sign(private_key, "sha256") # 生成证书 certificate = OpenSSL.crypto.X509() certificate.set_serial_number(1) certificate.gmtime_adj_notBefore(0) certificate.gmtime_adj_notAfter(365 * 24 * 60 * 60) # 设置证书的有效期 certificate.set_issuer(csr.get_subject()) certificate.set_subject(csr.get_subject()) certificate.set_pubkey(csr.get_pubkey()) certificate.sign(private_key, "sha256") # 保存私钥到文件 with open(key_p, "wb") as f: f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, private_key)) # 保存证书到文件 with open(crt_p, "wb") as f: f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, certificate)) return def create_for_https(): # print("#####path::", current_path) https_key_path=os.path.join(current_path, "https") crt=os.path.join(https_key_path, "certificate.crt") key=os.path.join(https_key_path, "private.key") # print("##https_key_path", crt,key) if not os.path.exists(https_key_path): # 使用mkdir()方法创建新目录 os.mkdir(https_key_path) if not os.path.exists(crt): create_key(key,crt) print('https_key OK: ', crt,key) return (crt,key) # workflow 目录下的所有json def read_workflow_json_files_all(folder_path): print('#read_workflow_json_files_all',folder_path) json_files = [] for root, dirs, files in os.walk(folder_path): for file in files: if file.endswith('.json'): json_files.append(os.path.join(root, file)) data = [] for file_path in json_files: try: with open(file_path) as json_file: json_data = json.load(json_file) creation_time = datetime.datetime.fromtimestamp(os.path.getctime(file_path)) numeric_timestamp = creation_time.timestamp() file_info = { 'filename': os.path.basename(file_path), 'category': os.path.dirname(file_path), 'data': json_data, 'date': numeric_timestamp } data.append(file_info) except Exception as e: print(e) sorted_data = sorted(data, key=lambda x: x['date'], reverse=True) return sorted_data # workflow def read_workflow_json_files(folder_path ): json_files = [] for filename in os.listdir(folder_path): if filename.endswith('.json'): json_files.append(filename) data = [] for file in json_files: file_path = os.path.join(folder_path, file) try: with open(file_path) as json_file: json_data = json.load(json_file) creation_time=datetime.datetime.fromtimestamp(os.path.getctime(file_path)) numeric_timestamp = creation_time.timestamp() file_info = { 'filename': file, 'data': json_data, 'date': numeric_timestamp } data.append(file_info) except Exception as e: print(e) sorted_data = sorted(data, key=lambda x: x['date'], reverse=True) return sorted_data def get_workflows(): # print("#####path::", current_path) workflow_path=os.path.join(current_path, "workflow") # print('workflow_path: ',workflow_path) if not os.path.exists(workflow_path): # 使用mkdir()方法创建新目录 os.mkdir(workflow_path) workflows=read_workflow_json_files(workflow_path) return workflows def get_my_workflow_for_app(filename="my_workflow_app.json",category="",is_all=False): app_path=os.path.join(current_path, "app") if not os.path.exists(app_path): os.mkdir(app_path) category_path=os.path.join(app_path,category) if not os.path.exists(category_path): os.mkdir(category_path) apps=[] if filename==None: #TODO 支持目录内遍历 if is_all: data=read_workflow_json_files_all(category_path) else: data=read_workflow_json_files(category_path) i=0 for item in data: # print(item) try: x=item["data"] # 管理员模式,读取全部数据 if i==0 or is_all: apps.append({ "filename":item["filename"], # "category":item['category'], "data":x, "date":item["date"], }) else: category='' input=None output=None if 'category' in x['app']: category=x['app']['category'] if 'input' in x['app']: input=x['app']['input'] if 'output' in x['app']: output=x['app']['output'] apps.append({ "filename":item["filename"], "category":category, "data":{ "app":{ "category":category, "description":x['app']['description'], "filename":(x['app']['filename'] if 'filename' in x['app'] else "") , "icon":(x['app']['icon'] if 'icon' in x['app'] else None), "name":x['app']['name'], "version":x['app']['version'], "input":input, "output":output, "id":x['app']['id'] } }, "date":item["date"] }) i+=1 except Exception as e: print("发生异常:", str(e)) else: app_workflow_path=os.path.join(category_path, filename) print('app_workflow_path: ',app_workflow_path) try: with open(app_workflow_path) as json_file: apps = [{ 'filename':filename, 'data':json.load(json_file) }] except Exception as e: print("发生异常:", str(e)) # 这个代码不需要 # if len(apps)==1 and category!='' and category!=None: data=read_workflow_json_files(category_path) for item in data: x=item["data"] # print(apps[0]['filename'] ,item["filename"]) if apps[0]['filename']!=item["filename"]: category='' input=None output=None if 'category' in x['app']: category=x['app']['category'] if 'input' in x['app']: input=x['app']['input'] if 'output' in x['app']: output=x['app']['output'] apps.append({ "filename":item["filename"], # "category":category, "data":{ "app":{ "category":category, "description":x['app']['description'], "filename":(x['app']['filename'] if 'filename' in x['app'] else "") , "icon":(x['app']['icon'] if 'icon' in x['app'] else None), "name":x['app']['name'], "version":x['app']['version'], "input":input, "output":output, "id":x['app']['id'] } }, "date":item["date"] }) return apps # 历史记录 def save_prompt_result(id,data): prompt_result_path=os.path.join(current_path, "workflow/prompt_result.json") prompt_result={} if os.path.exists(prompt_result_path): with open(prompt_result_path) as json_file: prompt_result = json.load(json_file) prompt_result[id]=data with open(prompt_result_path, 'w') as file: json.dump(prompt_result, file) return prompt_result_path def get_prompt_result(): prompt_result_path=os.path.join(current_path, "workflow/prompt_result.json") prompt_result={} if os.path.exists(prompt_result_path): with open(prompt_result_path) as json_file: prompt_result = json.load(json_file) res=list(prompt_result.values()) # print(res) return res def save_workflow_json(data): workflow_path=os.path.join(current_path, "workflow/my_workflow.json") with open(workflow_path, 'w') as file: json.dump(data, file) return workflow_path def save_workflow_for_app(data,filename="my_workflow_app.json",category=""): app_path=os.path.join(current_path, "app") if not os.path.exists(app_path): os.mkdir(app_path) category_path=os.path.join(app_path,category) if not os.path.exists(category_path): os.mkdir(category_path) app_workflow_path=os.path.join(category_path, filename) try: output_str = json.dumps(data['output']) data['app']['id']=calculate_md5(output_str) # id=data['app']['id'] except Exception as e: print("发生异常:", str(e)) with open(app_workflow_path, 'w') as file: json.dump(data, file) return filename def get_nodes_map(): # print("#####path::", current_path) data_path=os.path.join(current_path, "data") print('data_path: ',data_path) # if not os.path.exists(data_path): # # 使用mkdir()方法创建新目录 # os.mkdir(data_path) json_data={} nodes_map=os.path.join(current_path, "data/extension-node-map.json") if os.path.exists(nodes_map): with open(nodes_map) as json_file: json_data = json.load(json_file) return json_data # 保存原始的 get 方法 _original_request = aiohttp.ClientSession._request # 定义新的 get 方法 async def new_request(self, method, url, *args, **kwargs): # 检查环境变量以确定是否使用代理 proxy = os.environ.get('HTTP_PROXY') or os.environ.get('HTTPS_PROXY') or os.environ.get('http_proxy') or os.environ.get('https_proxy') # print('Proxy Config:',proxy) if proxy and 'proxy' not in kwargs: kwargs['proxy'] = proxy print('Use Proxy:',proxy) # 调用原始的 _request 方法 return await _original_request(self, method, url, *args, **kwargs) # 应用 Monkey Patch aiohttp.ClientSession._request = new_request import socket async def check_port_available(address, port): #检查端口是否可用 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((address, port)) return True except socket.error: return False # https async def new_start(self, address, port, verbose=True, call_on_start=None): try: runner = web.AppRunner(self.app, access_log=None) await runner.setup() # if not await check_port_available(address, port): # raise RuntimeError(f"Port {port} is already in use.") http_success = False http_port=port for i in range(11): # 尝试最多11次 if await check_port_available(address, port + i): http_port = port + i site = web.TCPSite(runner, address, http_port) await site.start() http_success = True break if not http_success: raise RuntimeError(f"Ports {port} to {port + 10} are all in use.") # site = web.TCPSite(runner, address, port) # await site.start() ssl_context = None scheme = "http" try: # 跟着本体修改 if args.tls_keyfile and args.tls_certfile: scheme = "https" ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER, verify_mode=ssl.CERT_NONE) ssl_context.load_cert_chain(certfile=args.tls_certfile, keyfile=args.tls_keyfile) else: # 如果没传,则自动创建 import ssl crt, key = create_for_https() ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(crt, key) except: import ssl crt, key = create_for_https() ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(crt, key) success = False for i in range(11): # 尝试最多11次 if await check_port_available(address, http_port + 1 + i): https_port = http_port + 1 + i site2 = web.TCPSite(runner, address, https_port, ssl_context=ssl_context) await site2.start() success = True break if not success: raise RuntimeError(f"Ports {http_port + 1} to {http_port + 10} are all in use.") if address == '': address = '127.0.0.1' if address=='0.0.0.0': address = '127.0.0.1' if verbose: logging.info("\n") logging.info("\n\nStarting server") # print("\033[93mStarting server\n") logging.info("\033[93mTo see the GUI go to: http://{}:{}".format(address, http_port)) logging.info("\033[93mTo see the GUI go to: https://{}:{}\033[0m".format(address, https_port)) # print("\033[93mTo see the GUI go to: http://{}:{}".format(address, http_port)) # print("\033[93mTo see the GUI go to: https://{}:{}\033[0m".format(address, https_port)) if call_on_start is not None: try: if scheme=='https': call_on_start(scheme,address, https_port) else: call_on_start(scheme,address, http_port) except: call_on_start(address,http_port) except Exception as e: print(f"Error starting the server: {e}") # import webbrowser # if os.name == 'nt' and address == '0.0.0.0': # address = '127.0.0.1' # webbrowser.open(f"https://{address}") # webbrowser.open(f"http://{address}:{port}") PromptServer.start=new_start # 创建路由表 routes = PromptServer.instance.routes @routes.post('/mixlab') async def mixlab_hander(request): config=os.path.join(current_path, "nodes/config.json") data={} try: if os.path.exists(config): with open(config, 'r') as f: data = json.load(f) # print(data) except Exception as e: print(e) return web.json_response(data) @routes.get('/mixlab/app') async def mixlab_app_handler(request): html_file = os.path.join(current_path, "web/index.html") if os.path.exists(html_file): with open(html_file, 'r', encoding='utf-8', errors='ignore') as f: html_data = f.read() return web.Response(text=html_data, content_type='text/html') else: return web.Response(text="HTML file not found", status=404) @routes.post('/mixlab/workflow') async def mixlab_workflow_hander(request): data = await request.json() result={} try: if 'task' in data: if data['task']=='save': file_path=save_workflow_json(data['data']) result={ 'status':'success', 'file_path':file_path } elif data['task']=='save_app': category="" if "category" in data: category=data['category'] file_path=save_workflow_for_app(data['data'],data['filename'],category) result={ 'status':'success', 'file_path':file_path } elif data['task']=='my_app': filename=None category="" admin=False if 'filename' in data: filename=data['filename'] if 'category' in data: category=data['category'] if 'admin' in data: admin=data['admin'] result={ 'data':get_my_workflow_for_app(filename,category,admin), 'status':'success', } elif data['task']=='list': result={ 'data':get_workflows(), 'status':'success', } except Exception as e: print(e) return web.json_response(result) @routes.post('/mixlab/nodes_map') async def nodes_map_hander(request): data = await request.json() result={} try: result={ 'data':get_nodes_map(), 'status':'success', } except Exception as e: print(e) return web.json_response(result) @routes.post("/mixlab/folder_paths") async def get_checkpoints(request): data = await request.json() t="checkpoints" names=[] try: t=data['type'] names = folder_paths.get_filename_list(t) except Exception as e: print('/mixlab/folder_paths',False,e) try: if data['type']=='llamafile': names=get_llama_models() except: print("llamafile none") try: if data['type']=='rembg': names=get_rembg_models(U2NET_HOME) except: print("rembg none") return web.json_response({"names":names,"types":list(folder_paths.folder_names_and_paths.keys())}) @routes.post('/mixlab/rembg') async def rembg_hander(request): data = await request.json() model=data['model'] result={} data_base64=remove_base64_prefix(data['base64']) image_data = base64.b64decode(data_base64) # 创建一个BytesIO对象 image_stream = io.BytesIO(image_data) # 使用PIL Image模块读取图像 image = Image.open(image_stream) if model=='briarmbg': _,rgba_images,_=run_briarmbg([image]) else: _,rgba_images,_=run_rembg(model,[image]) with io.BytesIO() as buf: rgba_images[0].save(buf, format='PNG') img_bytes = buf.getvalue() img_base64 = base64.b64encode(img_bytes).decode('utf-8') try: result={ 'data':img_base64, 'model':model, 'status':'success', } except Exception as e: print(e) return web.json_response(result) @routes.post("/mixlab/prompt_result") async def post_prompt_result(request): data = await request.json() res=None # print(data) try: action=data['action'] if action=='save': result=data['data'] res=save_prompt_result(result['prompt_id'],result) elif action=='all': res=get_prompt_result() except Exception as e: print('/mixlab/prompt_result',False,e) return web.json_response({"result":res}) async def start_local_llm(data): global llama_port,llama_model,llama_chat_format if llama_port and llama_model and llama_chat_format: return {"port":llama_port,"model":llama_model,"chat_format":llama_chat_format} import threading import uvicorn from llama_cpp.server.app import create_app from llama_cpp.server.settings import ( Settings, ServerSettings, ModelSettings, ConfigFileSettings, ) if not "model" in data and "model_path" in data: data['model']= os.path.basename(data["model_path"]) model=data["model_path"] elif "model" in data: model=get_llama_model_path(data['model']) n_gpu_layers=-1 if "n_gpu_layers" in data: n_gpu_layers=data['n_gpu_layers'] chat_format="chatml" model_alias=os.path.basename(model) # 多模态 clip_model_path=None prefix = "llava-phi-3-mini" file_name = prefix+"-mmproj-" if model_alias.startswith(prefix): for file in os.listdir(os.path.dirname(model)): if file.startswith(file_name): clip_model_path=os.path.join(os.path.dirname(model),file) chat_format='llava-1-5' print('#clip_model_path',chat_format,clip_model_path) address="127.0.0.1" port=9090 success = False for i in range(11): # 尝试最多11次 if await check_port_available(address, port + i): port = port + i success = True break if success == False: return {"port":None,"model":""} server_settings=ServerSettings(host=address,port=port) name, ext = os.path.splitext(os.path.basename(model)) print('#model',name) app = create_app( server_settings=server_settings, model_settings=[ ModelSettings( model=model, model_alias=name, n_gpu_layers=n_gpu_layers, n_ctx=4098, chat_format=chat_format, embedding=False, clip_model_path=clip_model_path )]) def run_uvicorn(): uvicorn.run( app, host=os.getenv("HOST", server_settings.host), port=int(os.getenv("PORT", server_settings.port)), ssl_keyfile=server_settings.ssl_keyfile, ssl_certfile=server_settings.ssl_certfile, ) # 创建一个子线程 thread = threading.Thread(target=run_uvicorn) # 启动子线程 thread.start() llama_port=port llama_model=data['model'] llama_chat_format=chat_format return {"port":llama_port,"model":llama_model,"chat_format":llama_chat_format} # llam服务的开启 @routes.post('/mixlab/start_llama') async def my_hander_method(request): data =await request.json() # print(data) if llama_port and llama_model and llama_chat_format: return web.json_response({"port":llama_port,"model":llama_model,"chat_format":llama_chat_format} ) try: result=await start_local_llm(data) except: result= {"port":None,"model":"","llama_cpp_error":True} print('start_local_llm error') return web.json_response(result) # 重启服务 @routes.post('/mixlab/re_start') def re_start(request): try: sys.stdout.close_log() except Exception as e: pass return os.execv(sys.executable, [sys.executable] + sys.argv) # 导入节点 from .nodes.PromptNode import GLIGENTextBoxApply_Advanced,EmbeddingPrompt,RandomPrompt,PromptSlide,PromptSimplification,PromptImage,JoinWithDelimiter from .nodes.ImageNode import ImageListToBatch_,ComparingTwoFrames,LoadImages_,CompositeImages,GridDisplayAndSave,GridInput,ImagesPrompt,SaveImageAndMetadata,SaveImageToLocal,SplitImage,GridOutput,GetImageSize_,MirroredImage,ImageColorTransfer,NoiseImage,TransparentImage,GradientImage,LoadImagesFromPath,LoadImagesFromURL,ResizeImage,TextImage,SvgImage,Image3D,ShowLayer,NewLayer,MergeLayers,CenterImage,AreaToMask,SmoothMask,SplitLongMask,ImageCropByAlpha,EnhanceImage,FaceToMask # from .nodes.Vae import VAELoader,VAEDecode from .nodes.ScreenShareNode import ScreenShareNode,FloatingVideo from .nodes.Audio import AudioPlayNode,SpeechRecognition,SpeechSynthesis from .nodes.Utils import IncrementingListNode,ListSplit,CreateLoraNames,CreateSampler_names,CreateCkptNames,CreateSeedNode,TESTNODE_,TESTNODE_TOKEN,AppInfo,IntNumber,FloatSlider,TextInput,ColorInput,FontInput,TextToNumber,DynamicDelayProcessor,LimitNumber,SwitchByIndex,MultiplicationNode from .nodes.Mask import PreviewMask_,MaskListReplace,MaskListMerge,OutlineMask,FeatheredMask from .nodes.Style import ApplyVisualStylePrompting,StyleAlignedReferenceSampler,StyleAlignedBatchAlign,StyleAlignedSampleReferenceLatents # 要导出的所有节点及其名称的字典 # 注意:名称应全局唯一 NODE_CLASS_MAPPINGS = { "AppInfo":AppInfo, "TESTNODE_":TESTNODE_, "TESTNODE_TOKEN":TESTNODE_TOKEN, "RandomPrompt":RandomPrompt, # "LoraPrompt":LoraPrompt, "EmbeddingPrompt":EmbeddingPrompt, "PromptSlide":PromptSlide, "GLIGENTextBoxApply_Advanced":GLIGENTextBoxApply_Advanced, "PromptSimplification":PromptSimplification, "PromptImage":PromptImage, "MirroredImage":MirroredImage, "NoiseImage":NoiseImage, "GradientImage":GradientImage, "TransparentImage":TransparentImage, "ResizeImageMixlab":ResizeImage, "LoadImagesFromPath":LoadImagesFromPath, "LoadImagesFromURL":LoadImagesFromURL, "LoadImagesToBatch":LoadImages_, "TextImage":TextImage, "EnhanceImage":EnhanceImage, "SvgImage":SvgImage, "3DImage":Image3D, "ImageColorTransfer":ImageColorTransfer, "ShowLayer":ShowLayer, "NewLayer":NewLayer, "ImageListToBatch_":ImageListToBatch_, "CompositeImages_":CompositeImages, "SplitImage":SplitImage, "CenterImage":CenterImage, "GridOutput":GridOutput, "GridDisplayAndSave":GridDisplayAndSave, "GridInput":GridInput, "MergeLayers":MergeLayers, "SplitLongMask":SplitLongMask, "FeatheredMask":FeatheredMask, "SmoothMask":SmoothMask, "FaceToMask":FaceToMask, "AreaToMask":AreaToMask, "ImageCropByAlpha":ImageCropByAlpha, "ImagesPrompt_":ImagesPrompt, # "VAELoaderConsistencyDecoder":VAELoader, "SaveImageToLocal":SaveImageToLocal, "SaveImageAndMetadata_":SaveImageAndMetadata, "ComparingTwoFrames_":ComparingTwoFrames, # "VAEDecodeConsistencyDecoder":VAEDecode, "ScreenShare":ScreenShareNode, "FloatingVideo":FloatingVideo, "SpeechRecognition":SpeechRecognition, "SpeechSynthesis":SpeechSynthesis, "Color":ColorInput, "FloatSlider":FloatSlider, "IntNumber":IntNumber, "TextInput_":TextInput, "Font":FontInput, "TextToNumber":TextToNumber, "DynamicDelayProcessor":DynamicDelayProcessor, "MultiplicationNode":MultiplicationNode, "GetImageSize_":GetImageSize_, "SwitchByIndex":SwitchByIndex, "LimitNumber":LimitNumber, "OutlineMask":OutlineMask, "MaskListMerge_":MaskListMerge, "JoinWithDelimiter":JoinWithDelimiter, "Seed_":CreateSeedNode, "CkptNames_":CreateCkptNames, "SamplerNames_":CreateSampler_names, "LoraNames_":CreateLoraNames, "ApplyVisualStylePrompting_":ApplyVisualStylePrompting, "StyleAlignedReferenceSampler_": StyleAlignedReferenceSampler, "StyleAlignedSampleReferenceLatents_": StyleAlignedSampleReferenceLatents, "StyleAlignedBatchAlign_": StyleAlignedBatchAlign, "ListSplit_":ListSplit, "MaskListReplace_":MaskListReplace, "IncrementingListNode_":IncrementingListNode, "PreviewMask_":PreviewMask_, "AudioPlay":AudioPlayNode } # 一个包含节点友好/可读的标题的字典 NODE_DISPLAY_NAME_MAPPINGS = { "AppInfo":"App Info ♾️MixlabApp", "ScreenShare":"Screen Share ♾️Mixlab", "FloatingVideo":"Floating Video ♾️Mixlab", "TextImage":"Text Image ♾️Mixlab", "Color":"Color Input ♾️MixlabApp", "TextInput_":"Text Input ♾️MixlabApp", "FloatSlider":"Float Slider Input ♾️MixlabApp", "IntNumber":"Int Input ♾️MixlabApp", "ImagesPrompt_":"Images Input ♾️MixlabApp", "SaveImageAndMetadata_":"Save Image Output ♾️MixlabApp", "ComparingTwoFrames_":"Comparing Two Frames ♾️MixlabApp", "ResizeImageMixlab":"Resize Image ♾️Mixlab", "RandomPrompt": "Random Prompt ♾️Mixlab", "PromptImage":"Output Prompt and Image ♾️Mixlab", "SplitLongMask":"Splitting a long image into sections", "VAELoaderConsistencyDecoder":"Consistency Decoder Loader", "VAEDecodeConsistencyDecoder":"Consistency Decoder Decode", "MergeLayers":"Merge Layers ♾️Mixlab", "SpeechSynthesis":"SpeechSynthesis ♾️Mixlab", "SpeechRecognition":"SpeechRecognition ♾️Mixlab", "3DImage":"3DImage ♾️Mixlab", "ImageListToBatch_":"Image List To Batch", "CompositeImages_":"Composite Images ♾️Mixlab", "DynamicDelayProcessor":"DynamicDelayByText ♾️Mixlab", "LaMaInpainting":"LaMaInpainting ♾️Mixlab", "PromptSlide":"Prompt Slide ♾️Mixlab", "PromptGenerate_Mix":"Prompt Generate ♾️Mixlab", "ChinesePrompt_Mix":"Chinese Prompt ♾️Mixlab", "GamePal":"GamePal ♾️Mixlab", "RembgNode_Mix":"Remove Background ♾️Mixlab", "LoraNames_":"LoraName ♾️Mixlab", "ApplyVisualStylePrompting_":"Apply VisualStyle Prompting ♾️Mixlab", "StyleAlignedReferenceSampler_": "StyleAligned Reference Sampler ♾️Mixlab", "StyleAlignedSampleReferenceLatents_": "StyleAligned Sample Reference Latents ♾️Mixlab", "StyleAlignedBatchAlign_": "StyleAligned Batch Align ♾️Mixlab", "LoadVideoAndSegment_":"Load Video And Segment ♾️Mixlab", "VideoCombine_Adv":"Video Combine ♾️Mixlab", "MaskListMerge_":"MaskList to Mask ♾️Mixlab", "ListSplit_":"Split List ♾️Mixlab", "MaskListReplace_":"MaskList Replace ♾️Mixlab", "ImageListReplace_":"ImageList Replace ♾️Mixlab", "SwitchByIndex":"List Switch By Index ♾️Mixlab", "GLIGENTextBoxApply_Advanced":"GLIGEN TextBox Apply ♾️Mixlab", "GridDisplayAndSave":"Grid Display And Save ♾️Mixlab", "GridInput":"Grid Input ♾️Mixlab", "GridOutput":"Grid Output ♾️Mixlab", "GetImageSize_":"Get Image Size ♾️Mixlab", "IncrementingListNode_":"Create Incrementing Number List ♾️Mixlab", "LoadImagesToBatch":"Load Images(base64) ♾️Mixlab", "PreviewMask_":"Preview Mask", "AudioPlay":"Audio Play ♾️Mixlab", "MultiplicationNode":"Math Operation ♾️Mixlab", } # web ui的节点功能 WEB_DIRECTORY = "./web" logging.info('--------------') logging.info('\033[91m ### Mixlab Nodes: \033[93mLoaded') # print('\033[91m ### Mixlab Nodes: \033[93mLoaded') try: from .nodes.ChatGPT import ChatGPTNode,ShowTextForGPT,CharacterInText,TextSplitByDelimiter logging.info('ChatGPT.available True') NODE_CLASS_MAPPINGS_V = { "ChatGPTOpenAI":ChatGPTNode, "ShowTextForGPT":ShowTextForGPT, "CharacterInText":CharacterInText, "TextSplitByDelimiter":TextSplitByDelimiter, } # 一个包含节点友好/可读的标题的字典 NODE_DISPLAY_NAME_MAPPINGS_V = { "ChatGPTOpenAI":"ChatGPT & Local LLM ♾️Mixlab", "ShowTextForGPT":"Show Text ♾️MixlabApp", "CharacterInText":"Character In Text", "TextSplitByDelimiter":"Text Split By Delimiter", } NODE_CLASS_MAPPINGS.update(NODE_CLASS_MAPPINGS_V) NODE_DISPLAY_NAME_MAPPINGS.update(NODE_DISPLAY_NAME_MAPPINGS_V) except Exception as e: logging.info('ChatGPT.available False') try: from .nodes.edit_mask import EditMask logging.info('edit_mask.available True') NODE_CLASS_MAPPINGS['EditMask']=EditMask NODE_DISPLAY_NAME_MAPPINGS['EditMask']="Edit Mask ♾️Mixlab" except Exception as e: logging.info('edit_mask.available False') try: from .nodes.Lama import LaMaInpainting logging.info('LaMaInpainting.available {}'.format(LaMaInpainting.available)) if LaMaInpainting.available: NODE_CLASS_MAPPINGS['LaMaInpainting']=LaMaInpainting except Exception as e: logging.info('LaMaInpainting.available False') try: from .nodes.ClipInterrogator import ClipInterrogator logging.info('ClipInterrogator.available {}'.format(ClipInterrogator.available)) if ClipInterrogator.available: NODE_CLASS_MAPPINGS['ClipInterrogator']=ClipInterrogator except Exception as e: logging.info('ClipInterrogator.available False') try: from .nodes.TextGenerateNode import PromptGenerate,ChinesePrompt logging.info('PromptGenerate.available {}'.format(PromptGenerate.available)) if PromptGenerate.available: NODE_CLASS_MAPPINGS['PromptGenerate_Mix']=PromptGenerate logging.info('ChinesePrompt.available {}'.format(ChinesePrompt.available)) if ChinesePrompt.available: NODE_CLASS_MAPPINGS['ChinesePrompt_Mix']=ChinesePrompt except Exception as e: logging.info('TextGenerateNode.available False') try: from .nodes.RembgNode import RembgNode_ logging.info('RembgNode_.available {}'.format(RembgNode_.available)) if RembgNode_.available: NODE_CLASS_MAPPINGS['RembgNode_Mix']=RembgNode_ except Exception as e: logging.info('RembgNode_.available False' ) try: from .nodes.Video import GenerateFramesByCount,scenesNode_,CombineAudioVideo,VideoCombine_Adv,LoadVideoAndSegment,ImageListReplace,VAEEncodeForInpaint_Frames,LoadAndCombinedAudio_ NODE_CLASS_MAPPINGS_V = { "VAEEncodeForInpaint_Frames":VAEEncodeForInpaint_Frames, "ImageListReplace_":ImageListReplace, "LoadVideoAndSegment_":LoadVideoAndSegment, "VideoCombine_Adv":VideoCombine_Adv, "LoadAndCombinedAudio_":LoadAndCombinedAudio_, "CombineAudioVideo":CombineAudioVideo, "ScenesNode_":scenesNode_, "GenerateFramesByCount":GenerateFramesByCount } # 一个包含节点友好/可读的标题的字典 NODE_DISPLAY_NAME_MAPPINGS_V = { "VAEEncodeForInpaint_Frames":"VAE Encode For Inpaint Frames ♾️Mixlab", "ImageListReplace_":"Image List Replace", "LoadVideoAndSegment_":"Load Video And Segment", "VideoCombine_Adv":"Video Combine", "LoadAndCombinedAudio_":"Load And Combined Audio", "CombineAudioVideo":"Combine Audio Video", "ScenesNode_":"Scenes Node", "GenerateFramesByCount":"Generate Frames By Count" } NODE_CLASS_MAPPINGS.update(NODE_CLASS_MAPPINGS_V) NODE_DISPLAY_NAME_MAPPINGS.update(NODE_DISPLAY_NAME_MAPPINGS_V) except: logging.info('Video.available False') try: from .nodes.TripoSR import LoadTripoSRModel,TripoSRSampler,SaveTripoSRMesh logging.info('TripoSR.available') NODE_CLASS_MAPPINGS['LoadTripoSRModel_']=LoadTripoSRModel NODE_DISPLAY_NAME_MAPPINGS["LoadTripoSRModel_"]= "Load TripoSR Model" NODE_CLASS_MAPPINGS['TripoSRSampler_']=TripoSRSampler NODE_DISPLAY_NAME_MAPPINGS["TripoSRSampler_"]= "TripoSR Sampler" NODE_CLASS_MAPPINGS['SaveTripoSRMesh']=SaveTripoSRMesh NODE_DISPLAY_NAME_MAPPINGS["SaveTripoSRMesh"]= "Save TripoSR Mesh" except Exception as e: logging.info('TripoSR.available False' ) logging.info('\033[93m -------------- \033[0m')