3v324v23's picture
lfs
1e3b872
import os
import io
import json
import requests
import torch
import google.generativeai as genai
from io import BytesIO
from PIL import Image
p = os.path.dirname(os.path.realpath(__file__))
def get_gemini_api_key():
try:
config_path = os.path.join(p, 'config.json')
with open(config_path, 'r') as f:
config = json.load(f)
api_key = config["GEMINI_API_KEY"]
except:
print("出错啦 Error: API key is required")
return ""
return api_key
class Gemini_API_Zho:
def __init__(self, api_key=None):
self.api_key = api_key
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"model_name": (["gemini-pro", "gemini-1.5-flash", "gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
"api_key": ("STRING", {"default": ""}) # Add api_key as an input
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def tensor_to_image(self, tensor):
# 确保张量是在CPU上
tensor = tensor.cpu()
# 将张量数据转换为0-255范围并转换为整数
# 这里假设张量已经是H x W x C格式
image_np = tensor.squeeze().mul(255).clamp(0, 255).byte().numpy()
# 创建PIL图像
image = Image.fromarray(image_np, mode='RGB')
return image
def generate_content(self, prompt, model_name, stream, api_key, image=None):
if api_key:
self.api_key = api_key
genai.configure(api_key=self.api_key,transport='rest')
if not self.api_key:
raise ValueError("API key is required")
model = genai.GenerativeModel(model_name)
if model_name == 'gemini-pro':
if stream:
response = model.generate_content(prompt, stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content(prompt)
textoutput = response.text
if model_name == 'gemini-1.5-flash':
if image == None:
raise ValueError("gemini-1.5-flash needs image")
else:
# 转换图像
pil_image = self.tensor_to_image(image)
# 直接使用PIL图像
if stream:
response = model.generate_content([prompt, pil_image], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, pil_image])
textoutput = response.text
if model_name == 'gemini-1.5-pro-latest':
if image == None:
if stream:
response = model.generate_content(prompt, stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content(prompt)
textoutput = response.text
else:
# 转换图像
pil_image = self.tensor_to_image(image)
# 直接使用PIL图像
if stream:
response = model.generate_content([prompt, pil_image], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, pil_image])
textoutput = response.text
return (textoutput,)
class Gemini_API_Vsion_ImgURL_Zho:
def __init__(self, api_key=None):
self.api_key = api_key
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "Describe this image", "multiline": True}),
"image_url": ("STRING", {"default": ""}),
"model_name": (["gemini-1.5-flash", "gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
"api_key": ("STRING", {"default": ""}) # Add api_key as an input
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def generate_content(self, prompt, model_name, stream, api_key, image_url):
if api_key:
self.api_key = api_key
genai.configure(api_key=self.api_key,transport='rest')
if not self.api_key:
raise ValueError("API key is required")
# Load the image from the URL
response = requests.get(image_url)
if response.status_code != 200:
raise ValueError("Failed to load image from URL")
img = Image.open(BytesIO(response.content))
model = genai.GenerativeModel(model_name)
if stream:
response = model.generate_content([prompt, img], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, img])
textoutput = response.text
return (textoutput,)
#chat
class Gemini_API_Chat_Zho:
def __init__(self, api_key=None):
self.api_key = api_key
self.chat = None # 初始化时,聊天实例为空
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"model_name": (["gemini-pro", "gemini-1.5-pro-latest"],),
"api_key": ("STRING", {"default": ""}) # Add api_key as an input
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("response",)
FUNCTION = "generate_chat"
CATEGORY = "Zho模块组/✨Gemini"
def generate_chat(self, prompt, model_name, api_key):
if api_key:
self.api_key = api_key
genai.configure(api_key=self.api_key,transport='rest')
if not self.api_key:
raise ValueError("API key is required")
if not self.chat:
model = genai.GenerativeModel(model_name)
self.chat = model.start_chat(history=[])
if model_name == 'gemini-pro':
response = self.chat.send_message(prompt)
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
if model_name == 'gemini-1.5-pro-latest':
if image == None:
response = self.chat.send_message(prompt)
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
else:
# 转换图像
pil_image = self.tensor_to_image(image)
response = self.chat.send_message([prompt, pil_image])
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
return (chat_history,)
def format_chat_history(self, chat):
formatted_history = []
for message in chat.history:
formatted_message = f"{message.role}: {message.parts[0].text}"
formatted_history.append(formatted_message)
formatted_history.append("-" * 40) # 添加分隔线
return "\n".join(formatted_history)
class Gemini_API_S_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"model_name": (["gemini-pro", "gemini-1.5-flash", "gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def tensor_to_image(self, tensor):
# 确保张量是在CPU上
tensor = tensor.cpu()
# 将张量数据转换为0-255范围并转换为整数
# 这里假设张量已经是H x W x C格式
image_np = tensor.squeeze().mul(255).clamp(0, 255).byte().numpy()
# 创建PIL图像
image = Image.fromarray(image_np, mode='RGB')
return image
def generate_content(self, prompt, model_name, stream, image=None):
if not self.api_key:
raise ValueError("API key is required")
model = genai.GenerativeModel(model_name)
if model_name == 'gemini-pro':
if stream:
response = model.generate_content(prompt, stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content(prompt)
textoutput = response.text
if model_name == 'gemini-1.5-flash':
if image == None:
raise ValueError("gemini-1.5-flash needs image")
else:
# 转换图像
pil_image = self.tensor_to_image(image)
# 直接使用PIL图像
if stream:
response = model.generate_content([prompt, pil_image], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, pil_image])
textoutput = response.text
if model_name == 'gemini-1.5-pro-latest':
if image == None:
if stream:
response = model.generate_content(prompt, stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content(prompt)
textoutput = response.text
else:
# 转换图像
pil_image = self.tensor_to_image(image)
# 直接使用PIL图像
if stream:
response = model.generate_content([prompt, pil_image], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, pil_image])
textoutput = response.text
return (textoutput,)
class Gemini_API_S_Vsion_ImgURL_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "Describe this image", "multiline": True}),
"image_url": ("STRING", {"default": ""}),
"model_name": (["gemini-1.5-flash", "gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def generate_content(self, prompt, model_name, stream, image_url):
if not self.api_key:
raise ValueError("API key is required")
# Load the image from the URL
response = requests.get(image_url)
if response.status_code != 200:
raise ValueError("Failed to load image from URL")
img = Image.open(BytesIO(response.content))
model = genai.GenerativeModel(model_name)
if stream:
response = model.generate_content([prompt, img], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, img])
textoutput = response.text
return (textoutput,)
#chat
class Gemini_API_S_Chat_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
self.chat = None # 初始化时,聊天实例为空
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"model_name": (["gemini-pro", "gemini-1.5-pro-latest"],),
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("response",)
FUNCTION = "generate_chat"
CATEGORY = "Zho模块组/✨Gemini"
def tensor_to_image(self, tensor):
# 确保张量是在CPU上
tensor = tensor.cpu()
# 将张量数据转换为0-255范围并转换为整数
# 这里假设张量已经是H x W x C格式
image_np = tensor.squeeze().mul(255).clamp(0, 255).byte().numpy()
# 创建PIL图像
image = Image.fromarray(image_np, mode='RGB')
return image
def generate_chat(self, prompt, model_name):
if not self.api_key:
raise ValueError("API key is required")
if not self.chat:
model = genai.GenerativeModel(model_name)
self.chat = model.start_chat(history=[])
if model_name == 'gemini-pro':
response = self.chat.send_message(prompt)
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
if model_name == 'gemini-1.5-pro-latest':
if image == None:
response = self.chat.send_message(prompt)
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
else:
# 转换图像
pil_image = self.tensor_to_image(image)
response = self.chat.send_message([prompt, pil_image])
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
return (chat_history,)
def format_chat_history(self, chat):
formatted_history = []
for message in chat.history:
formatted_message = f"{message.role}: {message.parts[0].text}"
formatted_history.append(formatted_message)
formatted_history.append("-" * 40) # 添加分隔线
return "\n".join(formatted_history)
# System instructions
class Gemini_15P_API_S_Advance_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"system_instruction": ("STRING", {"default": "You are creating a prompt for Stable Diffusion to generate an image. First step: describe this image, then put description into text. Second step: generate a text prompt for %s based on first step. Only respond with the prompt itself, but embellish it as needed but keep it under 80 tokens.", "multiline": True}),
"model_name": (["gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def tensor_to_image(self, tensor):
# 确保张量是在CPU上
tensor = tensor.cpu()
# 将张量数据转换为0-255范围并转换为整数
# 这里假设张量已经是H x W x C格式
image_np = tensor.squeeze().mul(255).clamp(0, 255).byte().numpy()
# 创建PIL图像
image = Image.fromarray(image_np, mode='RGB')
return image
def generate_content(self, prompt, system_instruction, model_name, stream, image=None):
if not self.api_key:
raise ValueError("API key is required")
model = genai.GenerativeModel(model_name, system_instruction=system_instruction)
if image == None:
if stream:
response = model.generate_content(prompt, stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content(prompt)
textoutput = response.text
else:
# 转换图像
pil_image = self.tensor_to_image(image)
# 直接使用PIL图像
if stream:
response = model.generate_content([prompt, pil_image], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, pil_image])
textoutput = response.text
return (textoutput,)
class Gemini_15P_API_S_Chat_Advance_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
self.chat = None # 初始化时,聊天实例为空
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"prompt": ("STRING", {"default": "What is the meaning of life?", "multiline": True}),
"system_instruction": ("STRING", {"default": "You are creating a prompt for Stable Diffusion to generate an image. First step: describe this image, then put description into text. Second step: generate a text prompt for %s based on first step. Only respond with the prompt itself, but embellish it as needed but keep it under 80 tokens.", "multiline": True}),
"model_name": (["gemini-1.5-pro-latest"],),
},
"optional": {
"image": ("IMAGE",),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("response",)
FUNCTION = "generate_chat"
CATEGORY = "Zho模块组/✨Gemini"
def tensor_to_image(self, tensor):
# 确保张量是在CPU上
tensor = tensor.cpu()
# 将张量数据转换为0-255范围并转换为整数
# 这里假设张量已经是H x W x C格式
image_np = tensor.squeeze().mul(255).clamp(0, 255).byte().numpy()
# 创建PIL图像
image = Image.fromarray(image_np, mode='RGB')
return image
def generate_chat(self, prompt, system_instruction, model_name, image=None):
if not self.api_key:
raise ValueError("API key is required")
if not self.chat:
model = genai.GenerativeModel(model_name, system_instruction=system_instruction)
self.chat = model.start_chat(history=[])
if model_name == 'gemini-1.5-pro-latest':
if image == None:
response = self.chat.send_message(prompt)
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
else:
# 转换图像
pil_image = self.tensor_to_image(image)
response = self.chat.send_message([prompt, pil_image])
textoutput = response.text
chat_history = self.format_chat_history(self.chat)
return (chat_history,)
def format_chat_history(self, chat):
formatted_history = []
for message in chat.history:
formatted_message = f"{message.role}: {message.parts[0].text}"
formatted_history.append(formatted_message)
formatted_history.append("-" * 40) # 添加分隔线
return "\n".join(formatted_history)
# File API
class Gemini_FileUpload_API_S_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"file": ("STRING", {"default": "./sample.mp3", "multiline": False}),
}
}
RETURN_TYPES = ("FILE",)
RETURN_NAMES = ("file",)
FUNCTION = "file_upload"
CATEGORY = "Zho模块组/✨Gemini"
def file_upload(self, file):
if not self.api_key:
raise ValueError("API key is required")
your_file = genai.upload_file(file)
return [your_file]
class Gemini_File_API_S_Zho:
def __init__(self):
self.api_key = get_gemini_api_key()
if self.api_key is not None:
genai.configure(api_key=self.api_key,transport='rest')
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"file": ("FILE",),
"prompt": ("STRING", {"default": "Listen carefully to the following audio file. Provide a brief summary.", "multiline": True}),
"model_name": (["gemini-1.5-pro-latest"],),
"stream": ("BOOLEAN", {"default": False}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "generate_content"
CATEGORY = "Zho模块组/✨Gemini"
def generate_content(self, prompt, model_name, stream, file):
if not self.api_key:
raise ValueError("API key is required")
model = genai.GenerativeModel(model_name)
if stream:
response = model.generate_content([prompt, file], stream=True)
textoutput = "\n".join([chunk.text for chunk in response])
else:
response = model.generate_content([prompt, file])
textoutput = response.text
return (textoutput,)
class ConcatText_Zho:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"text_1": ("STRING", {"multiline": True}),
"text_2": ("STRING", {"multiline": True}),
# 可以根据需要添加更多的文本输入
},
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
FUNCTION = "concat_texts"
CATEGORY = "Zho模块组/✨Gemini"
def concat_texts(self, **kwargs):
# 将所有输入的文本合并为一个以逗号分隔的字符串
texts = [kwargs[key] for key in kwargs if key.startswith('text')]
combined_text = ', '.join(texts)
return (combined_text,)
# DisplayText node is forked from AlekPet,thanks to AlekPet!
class DisplayText_Zho:
def __init__(self):
pass
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"text": ("STRING", {"forceInput": True}),
},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("text",)
OUTPUT_NODE = True
FUNCTION = "display_text"
CATEGORY = "Zho模块组/✨Gemini"
def display_text(self, text, prompt=None, extra_pnginfo=None):
return {"ui": {"string": [text,]}, "result": (text,)}
NODE_CLASS_MAPPINGS = {
"Gemini_API_Zho": Gemini_API_Zho,
"Gemini_API_Vsion_ImgURL_Zho": Gemini_API_Vsion_ImgURL_Zho,
"Gemini_API_Chat_Zho": Gemini_API_Chat_Zho,
"Gemini_API_S_Zho": Gemini_API_S_Zho,
"Gemini_API_S_Vsion_ImgURL_Zho": Gemini_API_S_Vsion_ImgURL_Zho,
"Gemini_API_S_Chat_Zho": Gemini_API_S_Chat_Zho,
"Gemini_15P_API_S_Advance_Zho": Gemini_15P_API_S_Advance_Zho,
"Gemini_15P_API_S_Chat_Advance_Zho": Gemini_15P_API_S_Chat_Advance_Zho,
"Gemini_FileUpload_API_S_Zho": Gemini_FileUpload_API_S_Zho,
"Gemini_File_API_S_Zho": Gemini_File_API_S_Zho,
"ConcatText_Zho": ConcatText_Zho,
"DisplayText_Zho": DisplayText_Zho
}
NODE_DISPLAY_NAME_MAPPINGS = {
"Gemini_API_Zho": "✨Gemini_API_Zho",
"Gemini_API_Vsion_ImgURL_Zho": "✨Gemini_API_Vsion_ImgURL_Zho",
"Gemini_API_Chat_Zho": "✨Gemini_API_Chat_Zho",
"Gemini_API_S_Zho": "㊙️Gemini_Zho",
"Gemini_API_S_Vsion_ImgURL_Zho": "㊙️Gemini_ImgURL_Zho",
"Gemini_API_S_Chat_Zho": "㊙️Gemini_Chat_Zho",
"Gemini_15P_API_S_Advance_Zho": "🆕Gemini_15P_Advance_Zho",
"Gemini_15P_API_S_Chat_Advance_Zho": "🆕Gemini_15P_Chat_Advance_Zho",
"Gemini_FileUpload_API_S_Zho": "📄Gemini_FileUpload_Zho",
"Gemini_File_API_S_Zho": "📄Gemini_File_Zho",
"ConcatText_Zho": "✨ConcatText_Zho",
"DisplayText_Zho": "✨DisplayText_Zho"
}