QISU / app.py
ahalamora's picture
Upload 3 files
42dd9e2
raw
history blame
4.52 kB
import streamlit as st
import os
import shutil
import zipfile
from zipfile import ZipFile
from doc_process import doc_process
extract_folder = 'documents'
compressed_file_name = "output"
input_folder = None
# 压缩目录到zip文件
def compress_directory(directory_path, output_zip):
try:
shutil.make_archive(output_zip, 'zip', directory_path)
return True
except Exception as e:
return str(e)
def recode(raw: str) -> str:
'''
编码修正
'''
try:
return raw.encode('cp437').decode('utf-8')
except:
return raw.encode('utf-8').decode('utf-8')
def zip_extract_all(src_zip_file: ZipFile, target_path: str) -> None:
# 遍历压缩包内所有内容,创建所有目录
for file_or_path in file.namelist():
# 若当前节点是文件夹
if file_or_path.endswith('/'):
try:
# 基于当前文件夹节点创建多层文件夹
os.makedirs(os.path.join(target_path, recode(file_or_path)))
except FileExistsError:
# 若已存在则跳过创建过程
pass
# 否则视作文件进行写出
else:
pass
# 遍历压缩包内所有内容,解压文件
for file_or_path in file.namelist():
# 若当前节点是文件夹
if file_or_path.endswith('/'):
pass
# 否则视作文件进行写出
else:
# 利用shutil.copyfileobj,从压缩包io流中提取目标文件内容写出到目标路径
with open(os.path.join(target_path, recode(file_or_path)), 'wb') as z:
# 这里基于Zipfile.open()提取文件内容时需要使用原始的乱码文件名
shutil.copyfileobj(src_zip_file.open(file_or_path), z)
st.header('起诉书 & 委托书 - 自动处理程序')
with open('readme.txt', 'r') as f:
readme = f.read()
if st.toggle('显示说明文档'):
st.markdown(readme.split('[img]')[0])
st.image('docs_format.jpg')
st.markdown(readme.split('[img]')[1])
# 添加一个文件上传组件
uploaded_file = st.file_uploader("选择要上传的文件", type=["zip"])
# 如果有文件上传
if uploaded_file:
# 保存上传的ZIP文件到本地临时目录
with open("temp.zip", "wb") as f:
f.write(uploaded_file.read())
# 创建文档目录
os.makedirs(extract_folder, exist_ok=True)
# 解压ZIP文件中的文件并处理文件名和内容
with zipfile.ZipFile("temp.zip", "r") as file:
# for file_or_path in file.namelist():
# print(file_or_path, ' -------> ' , recode(file_or_path))
zip_extract_all(file, extract_folder)
# 显示解压缩完成的消息
st.success(f"ZIP文件已成功解压缩到目录 {extract_folder}")
input_folder = os.listdir(extract_folder)[0] if os.listdir(extract_folder)[0] != 'output' else os.listdir(extract_folder)[1]
# 删除临时文件
os.remove("temp.zip")
if st.button('自动处理并生成ZIP文件'):
if input_folder:
input_path = os.path.join(extract_folder, input_folder)
output_path = os.path.join(extract_folder, 'output')
result = doc_process(input_path=input_path, output_path=output_path)
else:
result = '请先上传ZIP文件'
st.markdown(result)
if input_folder:
# 在Streamlit中压缩目录
if compress_directory(os.path.join(extract_folder, 'output'), compressed_file_name):
st.success("导出目录已成功压缩为ZIP文件")
# 创建下载链接
with open(f"{compressed_file_name}.zip", "rb") as file:
st.download_button("点击此处下载ZIP文件", file.read(), f"{compressed_file_name}.zip")
else:
st.error("目录压缩失败。")
if st.button('清空输出文档', type='primary'):
try:
shutil.rmtree(extract_folder)
except Exception as e:
# st.write(f"删除文件夹 {extract_folder} 时发生错误:{str(e)}")
pass
try:
os.remove(f"{compressed_file_name}.zip")
except Exception as e:
# st.write(f"删除文件 f'{compressed_file_name}.zip' 时发生错误:{str(e)}")
pass
if extract_folder in os.listdir() or f"{compressed_file_name}.zip" in os.listdir():
st.markdown(':red[完成任务后请点击“清空输出文档”]')
else:
st.markdown('所有输出文档已清空')