Spaces:
Running
Running
from io import BytesIO | |
import base64 | |
from PIL import Image | |
from flask import jsonify | |
import logging | |
import json | |
import re | |
logger = logging.getLogger(__name__) | |
def authenticate_request(password, request): | |
auth_header = request.headers.get('Authorization') | |
if not auth_header: | |
return False, jsonify({'error': 'Authorization header is missing'}), 401 | |
try: | |
auth_type, pass_word = auth_header.split(' ', 1) | |
except ValueError: | |
return False, jsonify({'error': 'Invalid Authorization header format'}), 401 | |
if auth_type.lower() != 'bearer': | |
return False, jsonify({'error': 'Authorization type must be Bearer'}), 401 | |
if pass_word != password: | |
return False, jsonify({'error': 'Unauthorized'}), 401 | |
return True, None, None | |
def process_messages_for_gemini(messages): | |
gemini_history = [] | |
for message in messages: | |
role = message.get('role') | |
content = message.get('content') | |
if isinstance(content, str): # 纯文本 | |
if role == 'system': | |
gemini_history.append({"role": "user", "parts": [content]}) | |
elif role == 'user': | |
gemini_history.append({"role": "user", "parts": [content]}) | |
elif role == 'assistant': | |
gemini_history.append({"role": "model", "parts": [content]}) | |
elif isinstance(content, list): # 图文 | |
parts = [] | |
for item in content: | |
if item.get('type') == 'text': | |
parts.append(item.get('text')) | |
elif item.get('type') == 'image_url': | |
image_data = item.get('image_url', {}).get('url', '') | |
if image_data.startswith('data:image/'): | |
try: | |
# 提取 base64 编码和图片类型 | |
image_type = image_data.split(';')[0].split('/')[1].upper() # 提取图片类型并转为大写 | |
base64_image = image_data.split(';base64,')[1] | |
image = Image.open(BytesIO(base64.b64decode(base64_image))) | |
# 将图片转换为 RGB 模式 | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# 压缩图像 | |
if image.width > 2048 or image.height > 2048: | |
image.thumbnail((2048, 2048)) | |
output_buffer = BytesIO() | |
image.save(output_buffer, format=image_type) # 使用原始图片类型保存 | |
output_buffer.seek(0) | |
parts.append(image) | |
except Exception as e: | |
logger.error(f"Error processing image: {e}") | |
return [], None, (jsonify({'error': 'Invalid image data'}), 400) | |
else: | |
return [], None, (jsonify({'error': 'Invalid image URL format'}), 400) | |
# 根据 role 添加到 gemini_history | |
if role in ['user', 'system']: | |
gemini_history.append({"role": "user", "parts": parts}) | |
elif role == 'assistant': | |
gemini_history.append({"role": "model", "parts": parts}) | |
else: | |
return [], None, (jsonify({'error': f'Invalid role: {role}'}), 400) | |
# 用户最后一条消息 | |
if gemini_history: | |
user_message = gemini_history[-1] | |
gemini_history = gemini_history[:-1] # 历史记录不包含最后一条消息 | |
else: | |
user_message = {"role": "user", "parts": [""]} | |
return gemini_history, user_message, None |