|
from datetime import datetime |
|
from fastapi import FastAPI |
|
import cv2 |
|
|
|
from utils.load_input import * |
|
from utils.operate_csv import * |
|
from utils.operate_json import * |
|
from utils.operate_s3 import * |
|
from utils.utils import * |
|
from utils.operate_tx import * |
|
from segment import * |
|
from eval_fashion_person import * |
|
from utils.app import * |
|
from name_entity_recognition import * |
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
def register_user(out, |
|
is_csv=False, |
|
is_app=False): |
|
for box in out[0].boxes: |
|
|
|
person_img = get_one_person(box.xyxy, out[0].orig_img) |
|
|
|
|
|
img_name = get_one_name() |
|
|
|
|
|
save_tx(file_data=person_img, |
|
file_name=img_name) |
|
|
|
|
|
data = eval_fashion_person_tx_url(img_name) |
|
|
|
|
|
if is_csv: |
|
append_to_csv(data) |
|
if is_app: |
|
app_register_user(data) |
|
print(f'人物{data["username"]}注册完成时间{datetime.now()}') |
|
|
|
|
|
|
|
def person_register_camera(camera_num=0, |
|
is_csv=False, |
|
is_app=False): |
|
|
|
cap = load_camera(camera_num) |
|
_, img = cap.read() |
|
|
|
|
|
out = segmentation(img) |
|
|
|
|
|
register_user(out, is_csv, is_app) |
|
|
|
|
|
|
|
def person_register_frames(capture_frames, |
|
is_csv=False, |
|
is_app=False): |
|
|
|
img = next(capture_frames) |
|
|
|
|
|
out = segmentation(img) |
|
|
|
|
|
img_name = get_one_name() |
|
|
|
|
|
save_tx(file_data=out[0].orig_img, |
|
file_name=img_name, |
|
bucket_name='fashion-imgs-1254150807') |
|
|
|
|
|
perd_out = out[0].plot() |
|
person_name = img_name.split('.')[0] |
|
pred_img_name = f'{person_name}_prediction.png' |
|
save_tx(file_data=perd_out, |
|
file_name=pred_img_name, |
|
bucket_name='fashion-imgs-1254150807') |
|
|
|
|
|
register_user(out, is_csv, is_app) |
|
|
|
|
|
|
|
def person_register_imgpath(imgs_path="./data/suanfamama-fashion-guangzhou-dataset/20240722", |
|
is_csv=False, |
|
is_app=False): |
|
|
|
for i, filename in enumerate(os.listdir(imgs_path)): |
|
|
|
if check_image_name_in_tx(filename): |
|
continue |
|
|
|
|
|
img = cv2.imread(os.path.join(imgs_path, filename)) |
|
|
|
|
|
save_tx(file_data=img, |
|
file_name=filename, |
|
bucket_name='fashion-imgs-1254150807') |
|
|
|
|
|
out = segmentation(img) |
|
|
|
|
|
perd_out = out[0].plot() |
|
person_name = filename.split('.')[0] |
|
pred_img_name = f'{person_name}_prediction.png' |
|
save_tx(file_data=perd_out, |
|
file_name=pred_img_name, |
|
bucket_name='fashion-imgs-1254150807') |
|
|
|
|
|
register_user(out, is_csv, is_app) |
|
|
|
|
|
|
|
def person_register_s3(bucket_name='fashion-guangzhou-dataset', |
|
is_csv=False, |
|
is_app=False): |
|
|
|
file_path = './data/temp/temp.png' |
|
|
|
|
|
from algs.alg0.utils.operate_s3 import config_S3 |
|
s3 = config_S3() |
|
|
|
|
|
bucket = s3.Bucket(bucket_name) |
|
|
|
|
|
for i, obj in enumerate(bucket.objects.all()): |
|
|
|
|
|
from algs.alg0.utils.operate_s3 import check_image_name_in_s3 |
|
if check_image_name_in_s3(obj.key): |
|
continue |
|
print(f'本图片开始时间{datetime.now()}') |
|
|
|
|
|
from algs.alg0.utils.operate_s3 import take_img_S3 |
|
take_img_S3(obj.key) |
|
img = cv2.imread('data/suanfamama-fashion-guangzhou-dataset/temp.jpeg', cv2.IMREAD_COLOR) |
|
print(f'读取图片完成时间{datetime.now()}') |
|
|
|
|
|
out = segmentation(img) |
|
|
|
|
|
pro_out = out[0].plot() |
|
|
|
save_img_as_png(pro_out, file_path) |
|
image_name = obj.key.split('.')[0] |
|
metadata = {'Content-Type': 'image/png'} |
|
bucket.upload_file(file_path, f'{image_name}.png', ExtraArgs={'ContentType': 'image/png'}) |
|
print(f'保存图片完成时间{datetime.now()}') |
|
|
|
for box in out[0].boxes: |
|
|
|
person_img = get_one_person(box.xyxy, out[0].orig_img) |
|
|
|
|
|
save_img_as_png(person_img, file_path) |
|
|
|
|
|
img_name, person_name = get_one_name() |
|
|
|
|
|
from algs.alg0.utils.operate_s3 import save_S3 |
|
save_S3(img_name) |
|
|
|
|
|
from algs.alg0.utils.operate_s3 import take_url_S3 |
|
img_url = take_url_S3(img_name) |
|
|
|
|
|
score, conclusion = get_score_conclusion(img_url) |
|
|
|
|
|
data = [image_name, person_name, img_url, score, conclusion] |
|
append_to_csv(data, csv_file_path="./data/temp/temp.csv") |
|
print(f'保存单个人物图片完成时间{datetime.now()}') |
|
|
|
|
|
result = { |
|
"username": person_name, |
|
"avatar_url": img_url, |
|
"fashion_score_true": -1, |
|
"fashion_score_predict": score, |
|
"fashion_eval_reason": conclusion |
|
} |
|
register_user(result) |
|
print(f'单个人物注册完成时间{datetime.now()}') |
|
|
|
|
|
|
|
def person_register_label(imgs_path='./data/suanfamama-fashion-guangzhou-dataset/20240730', |
|
label_path='./data/suanfamama-fashion-guangzhou-dataset/20240730_label', |
|
is_csv=False, |
|
is_app=False): |
|
|
|
for i, filename in enumerate(os.listdir(label_path)): |
|
|
|
print(f'本标签{filename}开始时间{datetime.now()}') |
|
img_path, boxes = read_json_label(os.path.join(label_path, filename)) |
|
|
|
for box in boxes: |
|
|
|
img_path = img_path.split('\\')[-1] |
|
orig_img = cv2.imread(os.path.join(imgs_path, img_path), |
|
cv2.IMREAD_COLOR) |
|
|
|
|
|
xyxy = box['xyxy'] |
|
person_img = orig_img[int(xyxy[1]):int(xyxy[3]), int(xyxy[0]):int(xyxy[2]), ::-1] |
|
person_img = output_to_binary(person_img) |
|
|
|
|
|
img_name = get_one_name() |
|
|
|
|
|
save_tx(file_data=person_img, |
|
file_name=img_name) |
|
|
|
|
|
data = eval_fashion_person_tx_url(img_name) |
|
|
|
|
|
data["fashion_score_true"] = box['label'], |
|
|
|
if is_csv: |
|
append_to_csv(data) |
|
if is_app: |
|
app_register_user(data) |
|
print(f'人物{data["username"]}注册完成时间{datetime.now()}') |
|
|
|
|
|
|
|
def person_register_csv(csv_file_path='./data/temp/temp.csv'): |
|
data = read_csv_to_dicts(csv_file_path) |
|
for i, t in enumerate(data): |
|
app_register_user(t) |
|
print(f'单人注册完成时间{datetime.now()}') |
|
|
|
|
|
|
|
def update_user(): |
|
|
|
data_list = app_get_user() |
|
for person in data_list: |
|
|
|
person_name = person[0] |
|
img_name = person_name + '.png' |
|
data = eval_fashion_person_tx_url(img_name) |
|
|
|
|
|
result = { |
|
"gender": data[4] |
|
} |
|
app_register_user(result, url="http://localhost:8000/users/updataRegByCamera") |
|
print(f'人物{data["username"]}更新完成时间{datetime.now()}') |
|
|
|
|
|
|
|
def update_user_csv(csv_file_path='./data/temp/temp.csv'): |
|
|
|
temp_file_path = csv_file_path + '.tmp' |
|
|
|
|
|
result = [] |
|
data_list = read_csv_to_dicts(csv_file_path) |
|
for row in data_list: |
|
img_name = row["username"] + '.png' |
|
data = eval_fashion_person_tx_url(img_name) |
|
result.append(data) |
|
print(f'人物{data["username"]}更新完成时间{datetime.now()}') |
|
dict_to_csv(result, temp_file_path) |
|
|
|
|
|
os.replace(temp_file_path, csv_file_path) |
|
|
|
|
|
|
|
def export_user_to_csv(csv_file_path='./data/temp/temp.csv'): |
|
users = app_get_user() |
|
for user in users: |
|
json = {} |
|
if check_in_csv(user['username'], csv_file_path, 'username'): |
|
continue |
|
while not json: |
|
data = name_entity_recognition(user['fashion_eval_reason']) |
|
json = text_to_json(data) |
|
user.update(json) |
|
user['base_model'] = 'LINKAI-3.5' |
|
user['scene'] = '沙面' |
|
append_to_csv(user, csv_file_path) |
|
print(f"{user['username']}获取完成时间:{datetime.now()}") |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(datetime.now()) |
|
export_user_to_csv(csv_file_path='./data/suanfamama-fashion-guangzhou-dataset/20240731.csv') |
|
print(datetime.now()) |
|
|