|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import base64 |
|
from io import BytesIO |
|
|
|
import gradio as gr |
|
import torch |
|
from app_modules.gradio_utils import ( |
|
cancel_outputing, |
|
delete_last_conversation, |
|
reset_state, |
|
reset_textbox, |
|
transfer_input, |
|
wrap_gen_fn, |
|
) |
|
from app_modules.overwrites import reload_javascript |
|
from app_modules.presets import CONCURRENT_COUNT, description, description_top, title |
|
from app_modules.utils import configure_logger, is_variable_assigned, strip_stop_words |
|
|
|
from deepseek_vl.serve.inference import ( |
|
convert_conversation_to_prompts, |
|
deepseek_generate, |
|
load_model, |
|
) |
|
from deepseek_vl.utils.conversation import SeparatorStyle |
|
|
|
|
|
def load_models(): |
|
models = { |
|
"DeepSeek-VL 7B": "deepseek-ai/deepseek-vl-7b-chat", |
|
} |
|
|
|
for model_name in models: |
|
models[model_name] = load_model(models[model_name]) |
|
|
|
return models |
|
|
|
|
|
logger = configure_logger() |
|
models = load_models() |
|
MODELS = sorted(list(models.keys())) |
|
|
|
|
|
def generate_prompt_with_history( |
|
text, image, history, vl_chat_processor, tokenizer, max_length=2048 |
|
): |
|
""" |
|
Generate a prompt with history for the deepseek application. |
|
|
|
Args: |
|
text (str): The text prompt. |
|
image (str): The image prompt. |
|
history (list): List of previous conversation messages. |
|
tokenizer: The tokenizer used for encoding the prompt. |
|
max_length (int): The maximum length of the prompt. |
|
|
|
Returns: |
|
tuple: A tuple containing the generated prompt, image list, conversation, and conversation copy. If the prompt could not be generated within the max_length limit, returns None. |
|
""" |
|
|
|
sft_format = "deepseek" |
|
user_role_ind = 0 |
|
bot_role_ind = 1 |
|
|
|
|
|
conversation = vl_chat_processor.new_chat_template() |
|
|
|
if history: |
|
conversation.messages = history |
|
|
|
if image is not None: |
|
if "<image_placeholder>" not in text: |
|
text = ( |
|
"<image_placeholder>" + "\n" + text |
|
) |
|
text = (text, image) |
|
|
|
conversation.append_message(conversation.roles[user_role_ind], text) |
|
conversation.append_message(conversation.roles[bot_role_ind], "") |
|
|
|
|
|
conversation_copy = conversation.copy() |
|
logger.info("=" * 80) |
|
logger.info(get_prompt(conversation)) |
|
|
|
rounds = len(conversation.messages) // 2 |
|
|
|
for _ in range(rounds): |
|
current_prompt = get_prompt(conversation) |
|
current_prompt = ( |
|
current_prompt.replace("</s>", "") |
|
if sft_format == "deepseek" |
|
else current_prompt |
|
) |
|
|
|
if torch.tensor(tokenizer.encode(current_prompt)).size(-1) <= max_length: |
|
return conversation_copy |
|
|
|
if len(conversation.messages) % 2 != 0: |
|
gr.Error("The messages between user and assistant are not paired.") |
|
return |
|
|
|
try: |
|
for _ in range(2): |
|
conversation.messages.pop(0) |
|
except IndexError: |
|
gr.Error("Input text processing failed, unable to respond in this round.") |
|
return None |
|
|
|
gr.Error("Prompt could not be generated within max_length limit.") |
|
return None |
|
|
|
|
|
def to_gradio_chatbot(conv): |
|
"""Convert the conversation to gradio chatbot format.""" |
|
ret = [] |
|
for i, (role, msg) in enumerate(conv.messages[conv.offset :]): |
|
if i % 2 == 0: |
|
if type(msg) is tuple: |
|
msg, image = msg |
|
if isinstance(image, str): |
|
with open(image, "rb") as f: |
|
data = f.read() |
|
img_b64_str = base64.b64encode(data).decode() |
|
image_str = f'<video src="data:video/mp4;base64,{img_b64_str}" controls width="426" height="240"></video>' |
|
msg = msg.replace("\n".join(["<image_placeholder>"] * 4), image_str) |
|
else: |
|
max_hw, min_hw = max(image.size), min(image.size) |
|
aspect_ratio = max_hw / min_hw |
|
max_len, min_len = 800, 400 |
|
shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) |
|
longest_edge = int(shortest_edge * aspect_ratio) |
|
W, H = image.size |
|
if H > W: |
|
H, W = longest_edge, shortest_edge |
|
else: |
|
H, W = shortest_edge, longest_edge |
|
image = image.resize((W, H)) |
|
buffered = BytesIO() |
|
image.save(buffered, format="JPEG") |
|
img_b64_str = base64.b64encode(buffered.getvalue()).decode() |
|
img_str = f'<img src="data:image/png;base64,{img_b64_str}" alt="user upload image" />' |
|
msg = msg.replace("<image_placeholder>", img_str) |
|
ret.append([msg, None]) |
|
else: |
|
ret[-1][-1] = msg |
|
return ret |
|
|
|
|
|
def to_gradio_history(conv): |
|
"""Convert the conversation to gradio history state.""" |
|
return conv.messages[conv.offset :] |
|
|
|
|
|
def get_prompt(conv) -> str: |
|
"""Get the prompt for generation.""" |
|
system_prompt = conv.system_template.format(system_message=conv.system_message) |
|
if conv.sep_style == SeparatorStyle.DeepSeek: |
|
seps = [conv.sep, conv.sep2] |
|
if system_prompt == "" or system_prompt is None: |
|
ret = "" |
|
else: |
|
ret = system_prompt + seps[0] |
|
for i, (role, message) in enumerate(conv.messages): |
|
if message: |
|
if type(message) is tuple: |
|
message, _ = message |
|
ret += role + ": " + message + seps[i % 2] |
|
else: |
|
ret += role + ":" |
|
return ret |
|
else: |
|
return conv.get_prompt |
|
|
|
|
|
@wrap_gen_fn |
|
def predict( |
|
text, |
|
image, |
|
chatbot, |
|
history, |
|
top_p, |
|
temperature, |
|
repetition_penalty, |
|
max_length_tokens, |
|
max_context_length_tokens, |
|
model_select_dropdown, |
|
): |
|
""" |
|
Function to predict the response based on the user's input and selected model. |
|
|
|
Parameters: |
|
user_text (str): The input text from the user. |
|
user_image (str): The input image from the user. |
|
chatbot (str): The chatbot's name. |
|
history (str): The history of the chat. |
|
top_p (float): The top-p parameter for the model. |
|
temperature (float): The temperature parameter for the model. |
|
max_length_tokens (int): The maximum length of tokens for the model. |
|
max_context_length_tokens (int): The maximum length of context tokens for the model. |
|
model_select_dropdown (str): The selected model from the dropdown. |
|
|
|
Returns: |
|
generator: A generator that yields the chatbot outputs, history, and status. |
|
""" |
|
print("running the prediction function") |
|
try: |
|
tokenizer, vl_gpt, vl_chat_processor = models[model_select_dropdown] |
|
|
|
if text == "": |
|
yield chatbot, history, "Empty context." |
|
return |
|
except KeyError: |
|
yield [[text, "No Model Found"]], [], "No Model Found" |
|
return |
|
|
|
conversation = generate_prompt_with_history( |
|
text, |
|
image, |
|
history, |
|
vl_chat_processor, |
|
tokenizer, |
|
max_length=max_context_length_tokens, |
|
) |
|
prompts = convert_conversation_to_prompts(conversation) |
|
|
|
stop_words = conversation.stop_str |
|
gradio_chatbot_output = to_gradio_chatbot(conversation) |
|
|
|
full_response = "" |
|
with torch.no_grad(): |
|
for x in deepseek_generate( |
|
prompts=prompts, |
|
vl_gpt=vl_gpt, |
|
vl_chat_processor=vl_chat_processor, |
|
tokenizer=tokenizer, |
|
stop_words=stop_words, |
|
max_length=max_length_tokens, |
|
temperature=temperature, |
|
repetition_penalty=repetition_penalty, |
|
top_p=top_p, |
|
): |
|
full_response += x |
|
response = strip_stop_words(full_response, stop_words) |
|
conversation.update_last_message(response) |
|
gradio_chatbot_output[-1][1] = response |
|
yield gradio_chatbot_output, to_gradio_history( |
|
conversation |
|
), "Generating..." |
|
|
|
print("flushed result to gradio") |
|
torch.cuda.empty_cache() |
|
|
|
if is_variable_assigned("x"): |
|
print(f"{model_select_dropdown}:\n{text}\n{'-' * 80}\n{x}\n{'=' * 80}") |
|
print( |
|
f"temperature: {temperature}, top_p: {top_p}, repetition_penalty: {repetition_penalty}, max_length_tokens: {max_length_tokens}" |
|
) |
|
|
|
yield gradio_chatbot_output, to_gradio_history(conversation), "Generate: Success" |
|
|
|
|
|
def retry( |
|
text, |
|
image, |
|
chatbot, |
|
history, |
|
top_p, |
|
temperature, |
|
repetition_penalty, |
|
max_length_tokens, |
|
max_context_length_tokens, |
|
model_select_dropdown, |
|
): |
|
if len(history) == 0: |
|
yield (chatbot, history, "Empty context") |
|
return |
|
|
|
chatbot.pop() |
|
history.pop() |
|
text = history.pop()[-1] |
|
if type(text) is tuple: |
|
text, image = text |
|
|
|
yield from predict( |
|
text, |
|
image, |
|
chatbot, |
|
history, |
|
top_p, |
|
temperature, |
|
repetition_penalty, |
|
max_length_tokens, |
|
max_context_length_tokens, |
|
model_select_dropdown, |
|
) |
|
|
|
|
|
def build_demo(MODELS): |
|
with open("deepseek_vl/serve/assets/custom.css", "r", encoding="utf-8") as f: |
|
customCSS = f.read() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
history = gr.State([]) |
|
input_text = gr.State() |
|
input_image = gr.State() |
|
|
|
with gr.Row(): |
|
gr.HTML(title) |
|
status_display = gr.Markdown("Success", elem_id="status_display") |
|
gr.Markdown(description_top) |
|
|
|
with gr.Row(equal_height=True): |
|
with gr.Column(scale=4): |
|
with gr.Row(): |
|
chatbot = gr.Chatbot( |
|
elem_id="deepseek_chatbot", |
|
show_share_button=True, |
|
likeable=True, |
|
bubble_full_width=False, |
|
height=600, |
|
) |
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
text_box = gr.Textbox( |
|
show_label=False, placeholder="Enter text", container=False |
|
) |
|
with gr.Column( |
|
min_width=70, |
|
): |
|
submitBtn = gr.Button("Send") |
|
with gr.Column( |
|
min_width=70, |
|
): |
|
cancelBtn = gr.Button("Stop") |
|
with gr.Row(): |
|
emptyBtn = gr.Button( |
|
"🧹 New Conversation", |
|
) |
|
retryBtn = gr.Button("🔄 Regenerate") |
|
delLastBtn = gr.Button("🗑️ Remove Last Turn") |
|
|
|
with gr.Column(): |
|
image_box = gr.Image(type="pil") |
|
|
|
with gr.Tab(label="Parameter Setting") as parameter_row: |
|
top_p = gr.Slider( |
|
minimum=-0, |
|
maximum=1.0, |
|
value=0.95, |
|
step=0.05, |
|
interactive=True, |
|
label="Top-p", |
|
) |
|
temperature = gr.Slider( |
|
minimum=0, |
|
maximum=1.0, |
|
value=0.1, |
|
step=0.1, |
|
interactive=True, |
|
label="Temperature", |
|
) |
|
repetition_penalty = gr.Slider( |
|
minimum=0.0, |
|
maximum=2.0, |
|
value=1.1, |
|
step=0.1, |
|
interactive=True, |
|
label="Repetition penalty", |
|
) |
|
max_length_tokens = gr.Slider( |
|
minimum=0, |
|
maximum=4096, |
|
value=2048, |
|
step=8, |
|
interactive=True, |
|
label="Max Generation Tokens", |
|
) |
|
max_context_length_tokens = gr.Slider( |
|
minimum=0, |
|
maximum=4096, |
|
value=4096, |
|
step=128, |
|
interactive=True, |
|
label="Max History Tokens", |
|
) |
|
model_select_dropdown = gr.Dropdown( |
|
label="Select Models", |
|
choices=MODELS, |
|
multiselect=False, |
|
value=MODELS[0], |
|
interactive=True, |
|
) |
|
|
|
examples_list = [ |
|
[ |
|
"deepseek_vl/serve/examples/rap.jpeg", |
|
"Can you write me a master rap song that rhymes very well based on this image?", |
|
], |
|
[ |
|
"deepseek_vl/serve/examples/app.png", |
|
"What is this app about?", |
|
], |
|
[ |
|
"deepseek_vl/serve/examples/pipeline.png", |
|
"Help me write a python code based on the image.", |
|
], |
|
[ |
|
"deepseek_vl/serve/examples/chart.png", |
|
"Could you help me to re-draw this picture with python codes?", |
|
], |
|
[ |
|
"deepseek_vl/serve/examples/mirror.png", |
|
"How many people are there in the image. Why?", |
|
], |
|
[ |
|
"deepseek_vl/serve/examples/puzzle.png", |
|
"Can this 2 pieces combine together?", |
|
], |
|
] |
|
gr.Examples(examples=examples_list, inputs=[image_box, text_box]) |
|
gr.Markdown(description) |
|
|
|
input_widgets = [ |
|
input_text, |
|
input_image, |
|
chatbot, |
|
history, |
|
top_p, |
|
temperature, |
|
repetition_penalty, |
|
max_length_tokens, |
|
max_context_length_tokens, |
|
model_select_dropdown, |
|
] |
|
output_widgets = [chatbot, history, status_display] |
|
|
|
transfer_input_args = dict( |
|
fn=transfer_input, |
|
inputs=[text_box, image_box], |
|
outputs=[input_text, input_image, text_box, image_box, submitBtn], |
|
show_progress=True, |
|
) |
|
|
|
predict_args = dict( |
|
fn=predict, |
|
inputs=input_widgets, |
|
outputs=output_widgets, |
|
show_progress=True, |
|
) |
|
|
|
retry_args = dict( |
|
fn=retry, |
|
inputs=input_widgets, |
|
outputs=output_widgets, |
|
show_progress=True, |
|
) |
|
|
|
reset_args = dict( |
|
fn=reset_textbox, inputs=[], outputs=[text_box, status_display] |
|
) |
|
|
|
predict_events = [ |
|
text_box.submit(**transfer_input_args).then(**predict_args), |
|
submitBtn.click(**transfer_input_args).then(**predict_args), |
|
] |
|
|
|
emptyBtn.click(reset_state, outputs=output_widgets, show_progress=True) |
|
emptyBtn.click(**reset_args) |
|
retryBtn.click(**retry_args) |
|
|
|
delLastBtn.click( |
|
delete_last_conversation, |
|
[chatbot, history], |
|
output_widgets, |
|
show_progress=True, |
|
) |
|
|
|
cancelBtn.click(cancel_outputing, [], [status_display], cancels=predict_events) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = build_demo(MODELS) |
|
demo.title = "DeepSeek-VL Chatbot" |
|
|
|
reload_javascript() |
|
demo.queue(concurrency_count=CONCURRENT_COUNT).launch( |
|
share=False, |
|
favicon_path="deepseek_vl/serve/assets/favicon.ico", |
|
inbrowser=False, |
|
server_name="0.0.0.0", |
|
server_port=8122, |
|
) |
|
|