from datetime import datetime from fastapi import FastAPI import cv2 from utils.load_input import * from utils.operate_csv import * from utils.operate_json import * from utils.operate_s3 import * from algs.alg0.utils.mama_utils import * from utils.operate_tx import * from segment import * from eval_fashion_person import * from utils.app import * from name_entity_recognition import * app = FastAPI() # 从模型结果中注册用户 def register_user(out, is_csv=False, is_app=False): for box in out[0].boxes: # 根据框截取人物图像 person_img = get_one_person(box.xyxy, out[0].orig_img) # 获取图片名称 img_name = get_one_name() # 将图片放到腾讯云 save_tx(file_data=person_img, file_name=img_name) # 获取评分 data = eval_fashion_person_tx_url(img_name) # 进行被动用户注册 if is_csv: append_to_csv(data) if is_app: app_register_user(data) print(f'人物{data["username"]}注册完成时间{datetime.now()}') # 使用摄像头发现行人并注册 def person_register_camera(camera_num=0, is_csv=False, is_app=False): # 加载摄像头 cap = load_camera(camera_num) _, img = cap.read() # 调用模型推理 out = segmentation(img) # 注册用户 register_user(out, is_csv, is_app) # 使用视频流发现行人并注册 def person_register_frames(capture_frames, is_csv=False, is_app=False): # 从实时视频流截取一帧 img = next(capture_frames) # 调用模型推理 out = segmentation(img) # 获取图片名称 img_name = get_one_name() # 保存原始图片 save_tx(file_data=out[0].orig_img, file_name=img_name, bucket_name='fashion-imgs-1254150807') # 保存画框图片 perd_out = out[0].plot() person_name = img_name.split('.')[0] pred_img_name = f'{person_name}_prediction.png' save_tx(file_data=perd_out, file_name=pred_img_name, bucket_name='fashion-imgs-1254150807') # 注册用户 register_user(out, is_csv, is_app) # 从图片路径发现行人并注册 def person_register_imgpath(imgs_path="./data/suanfamama-fashion-guangzhou-dataset/20240722", is_csv=False, is_app=False): # 从图片路径加载图片 for i, filename in enumerate(os.listdir(imgs_path)): # 检查是否已该处理过图片 if check_image_name_in_tx(filename): continue # 从图片路径读取图片 img = cv2.imread(os.path.join(imgs_path, filename)) # 保存原始图片 save_tx(file_data=img, file_name=filename, bucket_name='fashion-imgs-1254150807') # 调用模型推理 out = segmentation(img) # 保存画框图片 perd_out = out[0].plot() person_name = filename.split('.')[0] pred_img_name = f'{person_name}_prediction.png' save_tx(file_data=perd_out, file_name=pred_img_name, bucket_name='fashion-imgs-1254150807') # 注册用户 register_user(out, is_csv, is_app) # 从S3下载图片后发现行人并注册(停止维护) def person_register_s3(bucket_name='fashion-guangzhou-dataset', is_csv=False, is_app=False): # 暂存图片地址 file_path = './data/temp/temp.png' # 创建S3资源对象 from algs.alg0.utils.operate_s3 import config_S3 s3 = config_S3() # 获取桶对象 bucket = s3.Bucket(bucket_name) # 从S3桶加载图片 for i, obj in enumerate(bucket.objects.all()): # 检查是否已经注册 from algs.alg0.utils.operate_s3 import check_image_name_in_s3 if check_image_name_in_s3(obj.key): continue print(f'本图片开始时间{datetime.now()}') # 用图片名字获取图片 from algs.alg0.utils.operate_s3 import take_img_S3 take_img_S3(obj.key) img = cv2.imread('data/suanfamama-fashion-guangzhou-dataset/temp.jpeg', cv2.IMREAD_COLOR) print(f'读取图片完成时间{datetime.now()}') # 调用模型推理 out = segmentation(img) # 保存带框图片 pro_out = out[0].plot() # cv2.imwrite(f'./data/exp3.in/result/{filename}.png', pro_out) save_img_as_png(pro_out, file_path) image_name = obj.key.split('.')[0] metadata = {'Content-Type': 'image/png'} bucket.upload_file(file_path, f'{image_name}.png', ExtraArgs={'ContentType': 'image/png'}) print(f'保存图片完成时间{datetime.now()}') for box in out[0].boxes: # 根据框截取人物图像 person_img = get_one_person(box.xyxy, out[0].orig_img) # 将图片存为PNG文件 save_img_as_png(person_img, file_path) # 获取图片名称 img_name, person_name = get_one_name() # 将图片放到S3 from algs.alg0.utils.operate_s3 import save_S3 save_S3(img_name) # 从S3取出图片URL from algs.alg0.utils.operate_s3 import take_url_S3 img_url = take_url_S3(img_name) # 获取图片的评分和评价 score, conclusion = get_score_conclusion(img_url) # 将数据追加写入csv data = [image_name, person_name, img_url, score, conclusion] append_to_csv(data, csv_file_path="./data/temp/temp.csv") print(f'保存单个人物图片完成时间{datetime.now()}') # 进行被动用户注册 result = { "username": person_name, "avatar_url": img_url, "fashion_score_true": -1, "fashion_score_predict": score, "fashion_eval_reason": conclusion } register_user(result) print(f'单个人物注册完成时间{datetime.now()}') # 跟据标签分割人物并注册 def person_register_label(imgs_path='./data/suanfamama-fashion-guangzhou-dataset/20240730', label_path='./data/suanfamama-fashion-guangzhou-dataset/20240730_label', is_csv=False, is_app=False): # 从标签路径加载标签并读取相应图片 for i, filename in enumerate(os.listdir(label_path)): # 从标签获取标签值和box以及图片路径 print(f'本标签{filename}开始时间{datetime.now()}') img_path, boxes = read_json_label(os.path.join(label_path, filename)) for box in boxes: # 读取原始图片 img_path = img_path.split('\\')[-1] orig_img = cv2.imread(os.path.join(imgs_path, img_path), cv2.IMREAD_COLOR) # 根据框截取人物图像 xyxy = box['xyxy'] person_img = orig_img[int(xyxy[1]):int(xyxy[3]), int(xyxy[0]):int(xyxy[2]), ::-1] person_img = output_to_binary(person_img) # 获取图片名称 img_name = get_one_name() # 将图片放到腾讯云 save_tx(file_data=person_img, file_name=img_name) # 获取评分 data = eval_fashion_person_tx_url(img_name) # 进行被动用户注册 data["fashion_score_true"] = box['label'], if is_csv: append_to_csv(data) if is_app: app_register_user(data) print(f'人物{data["username"]}注册完成时间{datetime.now()}') # 从CSV文件中读取人物信息并注册 def person_register_csv(csv_file_path='./data/temp/temp.csv'): data = read_csv_to_dicts(csv_file_path) for i, t in enumerate(data): app_register_user(t) print(f'单人注册完成时间{datetime.now()}') # 更新数据库中每个人的评分 def update_user(): # 获取数据库中被动注册的用户数据 data_list = app_get_user() for person in data_list: # 获取评分 person_name = person[0] img_name = person_name + '.png' data = eval_fashion_person_tx_url(img_name) # 进行被动用户更新 result = { "gender": data[4] } app_register_user(result, url="http://localhost:8000/users/updataRegByCamera") print(f'人物{data["username"]}更新完成时间{datetime.now()}') # 更新csv中每个人的评分 def update_user_csv(csv_file_path='./data/temp/temp.csv'): # 创建一个临时文件来保存更新后的数据 temp_file_path = csv_file_path + '.tmp' # 获取CSV中的数据 result = [] data_list = read_csv_to_dicts(csv_file_path) for row in data_list: img_name = row["username"] + '.png' data = eval_fashion_person_tx_url(img_name) result.append(data) print(f'人物{data["username"]}更新完成时间{datetime.now()}') dict_to_csv(result, temp_file_path) # 替换原始文件 os.replace(temp_file_path, csv_file_path) # 从数据库获取用户到csv def export_user_to_csv(csv_file_path='./data/temp/temp.csv'): users = app_get_user() for user in users: json = {} if check_in_csv(user['username'], csv_file_path, 'username'): continue while not json: data = name_entity_recognition(user['fashion_eval_reason']) json = text_to_json(data) user.update(json) user['base_model'] = 'LINKAI-3.5' user['scene'] = '沙面' append_to_csv(user, csv_file_path) print(f"{user['username']}获取完成时间:{datetime.now()}") if __name__ == '__main__': # register_user() # 从模型结果中注册用户 # person_register_camera() # 使用摄像头发现行人并注册 # person_register_frames(capture_frames()) # 使用视频流发现行人并注册 # person_register_imgpath() # 从图片路径发现行人并注册 # person_register_s3() # 从S3下载图片后发现行人并注册(停止维护) # person_register_label() # 跟据标签分割人物并注册 # person_register_csv() # 从CSV注册进数据库 # update_user() # 更新数据库中的被动用户数据 # update_user_csv() # 更新CSV中的被动用户数据 # export_user_to_csv() # 从数据库获取被动用户信息 print(datetime.now()) export_user_to_csv(csv_file_path='./data/suanfamama-fashion-guangzhou-dataset/20240731.csv') print(datetime.now())