Spaces:
Running
Running
File size: 10,972 Bytes
2367901 711583c 963ca45 2367901 b0229d6 2367901 04b10c1 2367901 04b10c1 2367901 04b10c1 2367901 963ca45 711583c 2367901 711583c 2367901 711583c 2367901 04b10c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# Copyright (c) Microsoft
# Modified from Visual ChatGPT Project https://github.com/microsoft/TaskMatrix/blob/main/visual_chatgpt.py
import os
import gradio as gr
import re
import uuid
from PIL import Image, ImageDraw, ImageOps
import numpy as np
import argparse
import inspect
from langchain.agents.initialize import initialize_agent
from langchain.agents.tools import Tool
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain_community.chat_models import ChatOpenAI
import torch
from PIL import Image, ImageDraw, ImageOps
from transformers import pipeline, BlipProcessor, BlipForConditionalGeneration, BlipForQuestionAnswering
# openai.api_version = '2020-11-07'
os.environ["OPENAI_API_VERSION"] = '2020-11-07'
VISUAL_CHATGPT_PREFIX = """
I want you to act as an art connoisseur, providing in-depth and insightful analysis on various artworks. Your responses should reflect a deep understanding of art history, techniques, and cultural contexts, offering users a rich and nuanced perspective.
You can engage in natural-sounding conversations, generate human-like text based on input, and provide relevant, coherent responses on art-related topics."""
# TOOLS:
# ------
# Visual ChatGPT has access to the following tools:"""
VISUAL_CHATGPT_FORMAT_INSTRUCTIONS = """To use a tool, please use the following format:
"Thought: Do I need to use a tool? Yes
Action: the action to take, should be one of [{tool_names}], remember the action must to be one tool
Action Input: the input to the action
Observation: the result of the action"
When you have a response to say to the Human, or if you do not need to use a tool, you MUST use the format:
"Thought: Do I need to use a tool? No
{ai_prefix}: [your response here]"
"""
VISUAL_CHATGPT_SUFFIX = """
Begin Chatting!
Previous conversation history:
{chat_history}
New input: {input}
As a language model, you must repeatly to use VQA tools to observe images. You response should be consistent with the outputs of the VQA tool instead of imagination. Do not repeat asking the same question.
Thought: Do I need to use a tool? {agent_scratchpad} (You are strictly to use the aforementioned "Thought/Action/Action Input/Observation" format as the answer.)"""
os.makedirs('chat_image', exist_ok=True)
def prompts(name, description):
def decorator(func):
func.name = name
func.description = description
return func
return decorator
def cut_dialogue_history(history_memory, keep_last_n_words=500):
if history_memory is None or len(history_memory) == 0:
return history_memory
tokens = history_memory.split()
n_tokens = len(tokens)
print(f"history_memory:{history_memory}, n_tokens: {n_tokens}")
if n_tokens < keep_last_n_words:
return history_memory
paragraphs = history_memory.split('\n')
last_n_tokens = n_tokens
while last_n_tokens >= keep_last_n_words:
last_n_tokens -= len(paragraphs[0].split(' '))
paragraphs = paragraphs[1:]
return '\n' + '\n'.join(paragraphs)
def get_new_image_name(folder='chat_image', func_name="update"):
this_new_uuid = str(uuid.uuid4())[:8]
new_file_name = f'{func_name}_{this_new_uuid}.png'
return os.path.join(folder, new_file_name)
class VisualQuestionAnswering:
def __init__(self, device):
print(f"Initializing VisualQuestionAnswering to {device}")
self.torch_dtype = torch.float16 if 'cuda' in device else torch.float32
self.device = device
self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
self.model = BlipForQuestionAnswering.from_pretrained(
"Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype).to(self.device)
# self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-capfilt-large")
# self.model = BlipForQuestionAnswering.from_pretrained(
# "Salesforce/blip-vqa-capfilt-large", torch_dtype=self.torch_dtype).to(self.device)
@prompts(name="Answer Question About The Image",
description="VQA tool is useful when you need an answer for a question based on an image. "
"like: what is the color of an object, how many cats in this figure, where is the child sitting, what does the cat doing, why is he laughing."
"The input to this tool should be a comma separated string of two, representing the image path and the question.")
def inference(self, inputs):
image_path, question = inputs.split(",")[0], ','.join(inputs.split(',')[1:])
raw_image = Image.open(image_path).convert('RGB')
inputs = self.processor(raw_image, question, return_tensors="pt").to(self.device, self.torch_dtype)
out = self.model.generate(**inputs)
answer = self.processor.decode(out[0], skip_special_tokens=True)
print(f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input Question: {question}, "
f"Output Answer: {answer}")
return answer
def build_chatbot_tools(load_dict):
print(f"Initializing ChatBot, load_dict={load_dict}")
models = {}
# Load Basic Foundation Models
for class_name, device in load_dict.items():
models[class_name] = globals()[class_name](device=device)
# Load Template Foundation Models
for class_name, module in globals().items():
if getattr(module, 'template_model', False):
template_required_names = {k for k in inspect.signature(module.__init__).parameters.keys() if k!='self'}
loaded_names = set([type(e).__name__ for e in models.values()])
if template_required_names.issubset(loaded_names):
models[class_name] = globals()[class_name](
**{name: models[name] for name in template_required_names})
tools = []
for instance in models.values():
for e in dir(instance):
if e.startswith('inference'):
func = getattr(instance, e)
tools.append(Tool(name=func.name, description=func.description, func=func))
return tools
class ConversationBot:
def __init__(self, tools, api_key=""):
# load_dict = {'VisualQuestionAnswering':'cuda:0', 'ImageCaptioning':'cuda:1',...}
print("chatbot api",api_key)
llm = ChatOpenAI(model_name="gpt-4o", temperature=0.7, openai_api_key=api_key)
self.llm = llm
self.memory = ConversationBufferMemory(memory_key="chat_history", output_key='output')
self.tools = tools
self.current_image = None
self.point_prompt = ""
self.global_prompt = ""
self.agent = initialize_agent(
self.tools,
self.llm,
agent="conversational-react-description",
verbose=True,
memory=self.memory,
return_intermediate_steps=True,
agent_kwargs={'prefix': VISUAL_CHATGPT_PREFIX, 'format_instructions': VISUAL_CHATGPT_FORMAT_INSTRUCTIONS,
'suffix': VISUAL_CHATGPT_SUFFIX}, )
def constructe_intermediate_steps(self, agent_res):
ans = []
for action, output in agent_res:
if hasattr(action, "tool_input"):
use_tool = "Yes"
act = (f"Thought: Do I need to use a tool? {use_tool}\nAction: {action.tool}\nAction Input: {action.tool_input}", f"Observation: {output}")
else:
use_tool = "No"
act = (f"Thought: Do I need to use a tool? {use_tool}", f"AI: {output}")
act= list(map(lambda x: x.replace('\n', '<br>'), act))
ans.append(act)
return ans
def run_text(self, text, state, aux_state):
memory_str = self.agent.memory.buffer_as_str
trimmed_memory_str = cut_dialogue_history(memory_str, keep_last_n_words=500)
trimmed_messages = self.memory.buffer_as_messages[:len(trimmed_memory_str.split())]
# self.agent.memory.buffer = cut_dialogue_history(self.agent.memory.buffer, keep_last_n_words=500)
self.memory.chat_memory.messages = trimmed_messages
print("done")
if self.point_prompt != "":
Human_prompt = f'\nHuman: {self.point_prompt}\n'
AI_prompt = 'Ok'
# self.agent.memory.buffer = self.agent.memory.buffer + Human_prompt + 'AI: ' + AI_prompt
self.agent.memory.save_context({'input': Human_prompt}, {'output': AI_prompt})
self.point_prompt = ""
res = self.agent({"input": text})
res['output'] = res['output'].replace("\\", "/")
response = re.sub('(chat_image/\S*png)', lambda m: f'})*{m.group(0)}*', res['output'])
state = state + [(text, response)]
aux_state = aux_state + [(f"User Input: {text}", None)]
aux_state = aux_state + self.constructe_intermediate_steps(res['intermediate_steps'])
print(f"\nProcessed run_text, Input text: {text}\nCurrent state: {state}\n"
f"Current Memory: {self.agent.memory.buffer}\n"
f"Aux state: {aux_state}\n"
)
return state, state, aux_state, aux_state
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--load', type=str, default="VisualQuestionAnswering_cuda:0")
parser.add_argument('--port', type=int, default=1015)
args = parser.parse_args()
load_dict = {e.split('_')[0].strip(): e.split('_')[1].strip() for e in args.load.split(',')}
tools = build_chatbot_tools(load_dict)
bot = ConversationBot(tools)
with gr.Blocks(css="#chatbot .overflow-y-auto{height:500px}") as demo:
with gr.Row():
chatbot = gr.Chatbot(elem_id="chatbot", label="CATchat").style(height=1000,scale=0.5)
auxwindow = gr.Chatbot(elem_id="chatbot", label="Aux Window").style(height=1000,scale=0.5)
state = gr.State([])
aux_state = gr.State([])
with gr.Row():
with gr.Column(scale=0.7):
txt = gr.Textbox(show_label=False, placeholder="Enter text and press enter, or upload an image").style(
container=False)
with gr.Column(scale=0.15, min_width=0):
clear = gr.Button("Clear")
with gr.Column(scale=0.15, min_width=0):
btn = gr.UploadButton("Upload", file_types=["image"])
txt.submit(bot.run_text, [txt, state, aux_state], [chatbot, state, aux_state, auxwindow])
txt.submit(lambda: "", None, txt)
btn.upload(bot.run_image, [btn, state, txt, aux_state], [chatbot, state, txt, aux_state, auxwindow])
clear.click(bot.memory.clear)
clear.click(lambda: [], None, chatbot)
clear.click(lambda: [], None, auxwindow)
clear.click(lambda: [], None, state)
clear.click(lambda: [], None, aux_state)
demo.launch(server_name="0.0.0.0", server_port=args.port, share=True)
|