|
import streamlit as st |
|
from huggingface_hub import InferenceClient |
|
import os |
|
from typing import Iterator |
|
from PIL import Image |
|
import pytesseract |
|
from PyPDF2 import PdfReader |
|
import base64 |
|
from together import Together |
|
|
|
API_KEY = os.getenv("TOGETHER_API_KEY") |
|
if not API_KEY: |
|
raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") |
|
|
|
|
|
|
|
@st.cache_resource |
|
def get_client(): |
|
|
|
|
|
|
|
|
|
return Together(api_key=API_KEY) |
|
|
|
def process_file(file) -> str: |
|
"""Process uploaded file and return its content""" |
|
if file is None: |
|
return "" |
|
|
|
try: |
|
|
|
if file.type == "application/pdf": |
|
text = "" |
|
pdf_reader = PdfReader(file) |
|
for page in pdf_reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
text += page_text + "\n" |
|
return text |
|
|
|
|
|
elif file.type.startswith("image/"): |
|
return base64.b64encode(file.getvalue()).decode("utf-8") |
|
|
|
|
|
else: |
|
return file.getvalue().decode('utf-8') |
|
except Exception as e: |
|
return f"Error processing file: {str(e)}" |
|
|
|
def generate_response( |
|
message: str, |
|
history: list[tuple[str, str]], |
|
system_message: str, |
|
max_tokens: int, |
|
temperature: float, |
|
top_p: float, |
|
files=None |
|
) -> Iterator[str]: |
|
client = get_client() |
|
|
|
has_images = False |
|
content_blocks = [] |
|
image_content = None |
|
image_mime_type = None |
|
|
|
if files: |
|
for file in files: |
|
content = process_file(file) |
|
if file.type.startswith("image/"): |
|
has_images = True |
|
image_content = content |
|
image_mime_type = file.type |
|
else: |
|
content_blocks.append({ |
|
"type": "text", |
|
"text": f"File content:\n{content}" |
|
}) |
|
|
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
|
|
for user_msg, assistant_msg in history: |
|
messages.append({"role": "user", "content": user_msg}) |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
try: |
|
if has_images: |
|
|
|
vision_messages = [{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": message}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:{image_mime_type};base64,{image_content}", |
|
}, |
|
}, |
|
] |
|
}] |
|
|
|
stream = client.chat.completions.create( |
|
model="meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo", |
|
messages=vision_messages, |
|
stream=True, |
|
) |
|
|
|
else: |
|
|
|
current_message = { |
|
"role": "user", |
|
"content": [{"type": "text", "text": message}] + content_blocks |
|
} |
|
messages.append(current_message) |
|
|
|
stream = client.chat.completions.create( |
|
model="deepseek-ai/DeepSeek-R1", |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=top_p, |
|
stream=True |
|
) |
|
|
|
|
|
for chunk in stream: |
|
if chunk.choices and chunk.choices[0].delta.content: |
|
yield chunk.choices[0].delta.content |
|
|
|
except Exception as e: |
|
yield f"Error: {str(e)}" |
|
def main(): |
|
st.set_page_config(page_title="DeepSeek Chat", page_icon="π", layout="wide") |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
st.title("DeepSeek/Llama Vision Chat with File Upload") |
|
st.markdown("Chat with DeepSeek AI model. You can optionally upload files for the model to analyze.") |
|
st.markdown("Feel free to upload images too, in this case Llama Vision will be used") |
|
|
|
|
|
with st.sidebar: |
|
st.header("Settings") |
|
system_message = st.text_area( |
|
"System Message", |
|
value="You are a friendly Chatbot.", |
|
height=100 |
|
) |
|
max_tokens = st.slider( |
|
"Max Tokens", |
|
min_value=1, |
|
max_value=8192, |
|
value=8192, |
|
step=1 |
|
) |
|
temperature = st.slider( |
|
"Temperature", |
|
min_value=0.1, |
|
max_value=4.0, |
|
value=0.0, |
|
step=0.1 |
|
) |
|
top_p = st.slider( |
|
"Top-p (nucleus sampling)", |
|
min_value=0.1, |
|
max_value=1.0, |
|
value=0.95, |
|
step=0.05 |
|
) |
|
uploaded_file = st.file_uploader( |
|
"Upload File (optional)", |
|
type=['txt', 'py', 'md', 'swift', 'java', 'js', 'ts', 'rb', 'go', |
|
'php', 'c', 'cpp', 'h', 'hpp', 'cs', 'html', 'css', 'kt', 'svelte', |
|
'pdf', 'png', 'jpg', 'jpeg'], |
|
accept_multiple_files=True |
|
) |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.write(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("What would you like to know?"): |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
with st.chat_message("user"): |
|
st.write(prompt) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
response_placeholder = st.empty() |
|
full_response = "" |
|
|
|
|
|
history = [(msg["content"], next_msg["content"]) |
|
for msg, next_msg in zip(st.session_state.messages[::2], st.session_state.messages[1::2])] |
|
|
|
|
|
for response_chunk in generate_response( |
|
prompt, |
|
history, |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
uploaded_file |
|
): |
|
full_response += response_chunk |
|
print(full_response) |
|
response_placeholder.markdown(full_response + "β") |
|
|
|
response_placeholder.markdown(full_response) |
|
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|
|
if __name__ == "__main__": |
|
main() |