Spaces:
Paused
Paused
File size: 9,023 Bytes
e9d377a 93bc402 e9d377a def4fb5 e9d377a 1b5bd7b e9d377a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
import json
import os
import magic
from dotenv import load_dotenv
from docx import Document
from docx.shared import Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from flask_cors import CORS
from flask import Flask, request, jsonify
from supabase import create_client, Client
# load_dotenv(dotenv_path='.env.local')
load_dotenv()
app = Flask(__name__)
CORS(app)
url: str = 'https://dtzuqtvroalrjhgdcowq.supabase.co/'
key: str = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImR0enVxdHZyb2FscmpoZ2Rjb3dxIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MjU0NDk3MzIsImV4cCI6MjA0MTAyNTczMn0.WrIvwEOq4CqCb8IkU8G4jiWkf9DM1JxGd2_aTN4vlV4'
supabase: Client = create_client(url, key)
def get_file_by_id(file_id):
try:
response = supabase.table("files").select("*").eq("id", file_id).single().execute()
file = response.data
if not file:
raise ValueError(response.error.message if response.error else "File not found.")
file_path = file.get("file_path")
file_name = file.get("name")
if not file_path:
raise ValueError("File path is missing in the metadata.")
# Fetch the actual file content from Supabase storage
file_data = supabase.storage.from_('files').download(file_path)
return file_name, file_data
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def get_file_type(file_path):
try:
# Use python-magic to detect the MIME type of the file
mime = magic.Magic(mime=True)
file_type = mime.from_file(file_path)
return file_type
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def insert_file_record(user_id, doc):
try:
file_type = get_file_type(doc)
file_record = {
"user_id": user_id,
"description": "",
"file_path": "",
"name": "letterhead-" + os.path.basename(doc),
"size": os.path.getsize(doc),
"tokens": 0,
"type": file_type,
}
response = supabase.table("files").insert(file_record).execute()
return response
except Exception as e:
print("Error fetching file:", e)
return jsonify({"error": str(e)}), 500
def upload_file_to_storage(file, metadata):
# Replace with the actual upload implementation
file_path = f"{metadata['user_id']}/{metadata['file_id']}"
# file_content = file.read() # Read the file content as bytes
file_type = get_file_type(file)
try:
with open(file, 'rb') as f:
response = supabase.storage.from_("files").upload(
file=f,
path=file_path,
file_options={"cache-control": "3600", "content-type": file_type, "upsert": "false"},
)
file_path = response.path
return file_path
except Exception as e:
print("Error uploading file:", e)
return jsonify({"error": str(e)}), 500
def update_file_record(file_id, updates):
try:
response = supabase.table("files").update(updates).eq("id", file_id).execute()
return response
except Exception as e:
print("Error while updating record:", e)
return jsonify({"error": str(e)}), 500
def insert_text_and_image_at_end(full_path, full_image_path, text_to_insert, include_signature, signature_position, letterhead_address):
try:
doc = Document(full_path)
# Replace placeholder <<SENDER_ADDRESS>> with the letterhead address
for paragraph in doc.paragraphs:
if ("<<SENDER_ADDRESS>>" in paragraph.text and letterhead_address):
for run in paragraph.runs:
run.text = run.text.replace("<<SENDER_ADDRESS>>", letterhead_address)
# Add the new text at the end of the document
doc.add_paragraph(text_to_insert)
# Add the image at the end of the document with position adjustment
if (include_signature and full_image_path):
image_paragraph = doc.add_paragraph()
run = image_paragraph.add_run()
run.add_picture(full_image_path, width=Inches(1), height=Inches(1))
# Adjust the alignment based on signature_position
if signature_position == 'left':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT
elif signature_position == 'right':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT
elif signature_position == 'center':
image_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Save the document with the inserted text and image
doc.save(full_path)
return full_path
except Exception as e:
print("Error while inserting text:", e)
return jsonify({"error": str(e)}), 500
def fetch_image(bucket_name: str, image_path: str):
try:
# Download file from Supabase storage
file_data = supabase.storage.from_(bucket_name).download(image_path)
# Use python-magic to detect MIME type from the file data
mime_type = magic.Magic(mime=True).from_buffer(file_data)
current_directory = os.path.dirname(os.path.abspath(__file__))
os.makedirs('letterhead', exist_ok=True)
letterhead_image_path = image_path.split('/')[-1] + "." + mime_type.split('/')[-1]
full_letterhead_image_path = os.path.join(current_directory, "letterhead", letterhead_image_path)
with open(full_letterhead_image_path, 'wb') as f:
f.write(file_data)
return full_letterhead_image_path
except Exception as e:
print(f"Error: {e}")
return jsonify({"error": str(e)}), 500
def delete_all_files(directory):
keep_file = "WARNING-DO-NOT-DELETE.txt"
try:
# Loop through each file in the directory
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
# Check if it's a file and not the one to keep
if os.path.isfile(file_path) and filename != keep_file:
os.remove(file_path)
except Exception as e:
print(f"An error occurred: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/api/letterhead", methods=["POST"])
def letterhead():
data = request.get_json() # Get JSON data from the request (if provided)
# Extract data
chat_settings = data.get("chatSettings")
profile = data.get("profile")
letterhead_data = data.get("letterheadData")
try:
file_name, file_data = get_file_by_id(chat_settings["letterheadFileId"])
current_directory = os.getcwd()
full_letterhead_file_path = os.path.join(current_directory, "letterhead", file_name)
full_letterhead_signature_path = None
if (letterhead_data["includeSignature"] and (chat_settings["letterheadSignatureImagePath"])):
full_letterhead_signature_path = fetch_image("assistant_images", (chat_settings["letterheadSignatureImagePath"]))
text_to_insert = letterhead_data["letterheadContent"]
include_signature = letterhead_data["includeSignature"]
signature_position = letterhead_data["signaturePosition"]
letterhead_address = letterhead_data["letterheadAddress"]
with open(full_letterhead_file_path, "wb") as f:
if hasattr(file_data, "read"):
f.write(file_data.read())
else: # If it's raw bytes
f.write(file_data)
modified_doc = insert_text_and_image_at_end(full_letterhead_file_path, full_letterhead_signature_path, text_to_insert, include_signature,signature_position, letterhead_address)
created_file = insert_file_record(profile["user_id"], modified_doc)
file_data = created_file.json()
file_data_json = json.loads(file_data)
file_path = upload_file_to_storage(modified_doc, {
"name": file_data_json["data"][0]["name"],
"user_id": file_data_json["data"][0]["user_id"],
"file_id": file_data_json["data"][0]["id"],
})
update_file_record(file_data_json["data"][0]["id"], {"file_path": file_path})
current_directory = os.getcwd()
file_deleting_directory_path = os.path.join(current_directory, "letterhead")
delete_all_files(file_deleting_directory_path)
message = f"letterheadFileId:{file_data_json['data'][0]['id']} Your letterhead is successfully created."
return jsonify({ "message": message }), 200
except ValueError as e:
return jsonify({"error": str(e)}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True) |