mlp / app.py
coycs's picture
Update app.py
bdbaaec
from typing import List
from pydantic import BaseModel
from fastapi import FastAPI, Response, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware # 跨域
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
import sklearn.preprocessing as preproc
from sklearn.preprocessing import StandardScaler
import lightgbm as lgb
from xgboost import XGBRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
import io
import json
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('AGG')
app = FastAPI()
# set cross-domain whitelist
origins = [
"http://127.0.0.1:5500",
"http://localhost:8081",
"http://mlca.coycs.com",
"https://mlca.coycs.com",
"http://celadon-lebkuchen-cc4bb0.netlify.app",
"https://celadon-lebkuchen-cc4bb0.netlify.app"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
# 工具函数
def json2df(json):
# 字符串转数值
def str2num(x):
if isinstance(x, str):
return eval(x)
else:
return x
df = pd.DataFrame(json)
# 空白符转None,且是"None"让eval能解析成功
df.replace(to_replace=r"^\s*$", value="None", regex=True, inplace=True)
# 科学计数法转数值
df = df.applymap(str2num)
return df
# def process_abnormal(df, detect, method): # 异常值处理
# if detect == 1: # IQR检测方式
# for coloum in df.columns:
# q1 = df[coloum].quantile(0.75)
# q3 = df[coloum].quantile(0.25)
# iqr = q1-q3
# if method == 1: # 删除异常值
# df.drop(
# df.loc[lambda x:x[coloum] > q1 + 1.5 * iqr].index, inplace=True)
# df.drop(
# df.loc[lambda x:x[coloum] < q3 - 1.5 * iqr].index, inplace=True)
# elif method == 2: # 均值替换
# df.loc[lambda x:x[coloum] > q1 + 1.5 *
# iqr, coloum]=df[coloum].mean()
# df.loc[lambda x:x[coloum] < q3 - 1.5 *
# iqr, coloum]=df[coloum].mean()
# elif method == 3: # 中位数替换
# df.loc[lambda x:x[coloum] > q1 + 1.5 *
# iqr, coloum]=df[coloum].median()
# df.loc[lambda x:x[coloum] < q3 - 1.5 *
# iqr, coloum]=df[coloum].median()
# elif method == 4: # 众数替换
# df.loc[lambda x:x[coloum] > q1 + 1.5 *
# iqr, coloum]=df[coloum].mode().iloc[0]
# df.loc[lambda x:x[coloum] < q3 - 1.5 *
# iqr, coloum]=df[coloum].mode().iloc[0]
# elif method == 5: # 边界替换
# df.loc[lambda x:x[coloum] > q1 +
# 1.5 * iqr, coloum]=q1 + 1.5 * iqr
# df.loc[lambda x:x[coloum] < q3 -
# 1.5 * iqr, coloum]=q3 - 1.5 * iqr
# elif detect == 2: # Z-score检测方式
# for coloum in df.columns:
# mean = df[coloum].mean()
# std = df[coloum].std()
# df.drop(
# df.loc[lambda x:x[coloum] > mean + 3 * std].index, inplace=True)
# df.drop(
# df.loc[lambda x:x[coloum] < mean - 3 * std].index, inplace=True)
# if method == 1: # 删除异常值
# df.drop(
# df.loc[lambda x:x[coloum] > mean + 3 * std].index, inplace=True)
# df.drop(
# df.loc[lambda x:x[coloum] < mean - 3 * std].index, inplace=True)
# elif method == 2: # 均值替换
# df.loc[lambda x:x[coloum] > mean +
# 3 * std, coloum]=df[coloum].mean()
# df.loc[lambda x:x[coloum] < mean -
# 3 * std, coloum]=df[coloum].mean()
# elif method == 3: # 中位数替换
# df.loc[lambda x:x[coloum] > mean + 3 *
# std, coloum]=df[coloum].median()
# df.loc[lambda x:x[coloum] < mean - 3 *
# std, coloum]=df[coloum].median()
# elif method == 4: # 众数替换
# df.loc[lambda x:x[coloum] > mean + 3 *
# std, coloum]=df[coloum].mode().iloc[0]
# df.loc[lambda x:x[coloum] < mean - 3 *
# std, coloum]=df[coloum].mode().iloc[0]
# elif method == 5: # 边界替换
# df.loc[lambda x:x[coloum] > mean +
# 3 * std, coloum]=mean + 3 * std
# df.loc[lambda x:x[coloum] < mean -
# 3 * std, coloum]=mean - 3 * std
# return df
def process_miss(df, method): # 缺失值处理
# 舍弃全为空的行
df = df.dropna(how='all')
# 舍弃全为空的列
df = df.dropna(axis=1, how='all')
if method == 1: # 均值
df = df.fillna(df.mean())
elif method == 2: # 中位数
df = df.fillna(df.median())
elif method == 3: # 众数
df = df.fillna(df.mode().iloc[0])
elif method == 4: # 线性
df = df.fillna(df.interpolate(
method='linear', limit_direction='forward', axis=0))
elif method == 5: # 前值
df = df.fillna(method="ffill")
elif method == 6: # 后值
df = df.fillna(method="bfill")
return df
def process_abnormal(df_inside, df_user, detect, method): # 异常值处理
df = pd.concat([df_inside, df_user], axis=0,
ignore_index=True) # 合并的dataframe
df_features = df.iloc[:, :12] # 取所有的特征列为dataframe
# print(df)
if detect == 1: # IQR检测方式
for coloum in df_features.columns:
q1 = df_features[coloum].quantile(0.75)
q3 = df_features[coloum].quantile(0.25)
iqr = q1-q3
if method == 1: # 删除异常值
df_features.drop(
df_features.loc[lambda x:x[coloum] > q1 + 1.5 * iqr].index, inplace=True)
df_features.drop(
df_features.loc[lambda x:x[coloum] < q3 - 1.5 * iqr].index, inplace=True)
elif method == 2: # 均值替换
df_features.loc[lambda x:x[coloum] > q1 + 1.5 *
iqr, coloum]=df_features[coloum].mean()
df_features.loc[lambda x:x[coloum] < q3 - 1.5 *
iqr, coloum]=df_features[coloum].mean()
elif method == 3: # 中位数替换
df_features.loc[lambda x:x[coloum] > q1 + 1.5 *
iqr, coloum]=df_features[coloum].median()
df_features.loc[lambda x:x[coloum] < q3 - 1.5 *
iqr, coloum]=df_features[coloum].median()
elif method == 4: # 众数替换
df_features.loc[lambda x:x[coloum] > q1 + 1.5 *
iqr, coloum]=df_features[coloum].mode().iloc[0]
df_features.loc[lambda x:x[coloum] < q3 - 1.5 *
iqr, coloum]=df_features[coloum].mode().iloc[0]
elif method == 5: # 边界替换
df_features.loc[lambda x:x[coloum] > q1 +
1.5 * iqr, coloum]=q1 + 1.5 * iqr
df_features.loc[lambda x:x[coloum] < q3 -
1.5 * iqr, coloum]=q3 - 1.5 * iqr
elif detect == 2: # Z-score检测方式
for coloum in df_features.columns:
mean = df_features[coloum].mean()
std = df_features[coloum].std()
df_features.drop(
df_features.loc[lambda x:x[coloum] > mean + 3 * std].index, inplace=True)
df_features.drop(
df_features.loc[lambda x:x[coloum] < mean - 3 * std].index, inplace=True)
if method == 1: # 删除异常值
df_features.drop(
df_features.loc[lambda x:x[coloum] > mean + 3 * std].index, inplace=True)
df_features.drop(
df_features.loc[lambda x:x[coloum] < mean - 3 * std].index, inplace=True)
elif method == 2: # 均值替换
df_features.loc[lambda x:x[coloum] > mean +
3 * std, coloum]=df_features[coloum].mean()
df_features.loc[lambda x:x[coloum] < mean -
3 * std, coloum]=df_features[coloum].mean()
elif method == 3: # 中位数替换
df_features.loc[lambda x:x[coloum] > mean + 3 *
std, coloum]=df_features[coloum].median()
df_features.loc[lambda x:x[coloum] < mean - 3 *
std, coloum]=df_features[coloum].median()
elif method == 4: # 众数替换
df_features.loc[lambda x:x[coloum] > mean + 3 *
std, coloum]=df_features[coloum].mode().iloc[0]
df_features.loc[lambda x:x[coloum] < mean - 3 *
std, coloum]=df_features[coloum].mode().iloc[0]
elif method == 5: # 边界替换
df_features.loc[lambda x:x[coloum] > mean +
3 * std, coloum]=mean + 3 * std
df_features.loc[lambda x:x[coloum] < mean -
3 * std, coloum]=mean - 3 * std
df.iloc[:, :12] = df_features
df_inside = df.iloc[:df_inside.shape[0], :]
df_user = df.iloc[df_inside.shape[0]:, :12]
return {"df_inside": df_inside, "df_user": df_user}
def process_standard(df_inside, df_user, method): # 标准化处理
df = pd.concat([df_inside, df_user], axis=0,
ignore_index=True) # 合并的dataframe
df_features = df.iloc[:, :12] # 取所有的特征列为dataframe
columns = df_features.columns # 列名
if method == 1: # Min-max
df_features = preproc.minmax_scale(df_features)
elif method == 2: # Z-Score
df_features = preproc.StandardScaler().fit_transform(df_features)
elif method == 3: # MaxAbs
df_features = preproc.maxabs_scale(df_features, axis=0)
elif method == 4: # RobustScaler
df_features = preproc.RobustScaler().fit_transform(df_features)
elif method == 5: # 正则化
df_features = preproc.normalize(df_features, axis=0)
df_features = pd.DataFrame(
data=df_features[0:, 0:], columns=columns) # 补充列名
df.iloc[:, :12] = df_features
df_inside = df.iloc[:df_inside.shape[0], :]
df_user = df.iloc[df_inside.shape[0]:, :12]
return {"df_inside": df_inside, "df_user": df_user}
def train_model(x, y, test_size, algorithm, paras): # 模型训练
# 划分数据集
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=test_size, random_state=0)
# 机器学习
model = None
results = {}
if algorithm == 1: # 最小二乘法线性回归
model = LinearRegression(fit_intercept=paras["fit_intercept"])
if algorithm == 2: # 随机森林回归
model = RandomForestRegressor(n_estimators=paras["n_estimators"],
criterion=paras["criterion"], max_depth=paras["max_depth"], random_state=0)
if algorithm == 3: # BP神经网络回归
model = MLPRegressor(hidden_layer_sizes=(paras["hidden_layer_sizes_1"], paras["hidden_layer_sizes_2"]),
activation=paras["activation"], solver='lbfgs', random_state=paras["random_state"])
if algorithm == 4: # XGBoost回归
model = XGBRegressor(
max_depth=paras["max_depth"], learning_rate=paras["learning_rate"], n_estimators=paras["n_estimators"])
if algorithm == 5: # LightGBM回归
# model = lgb.LGBMRegressor(objective='regression',boosting_type="dart",num_leaves=30, max_depth=-1,n_estimators=20,learning_rate=1)
model = lgb.LGBMRegressor(objective='regression', max_depth=paras["max_depth"],
learning_rate=paras["learning_rate"], random_state=paras["random_state"], n_estimators=paras["n_estimators"])
# 返回数据
if model != None:
model.fit(x_train, y_train)
if algorithm == 1: # 最小二乘法线性回归
# 保留小数点后三位
# results["coef"] = model.coef_.tolist() # 模型斜率
results["coef"] = [float('{:.4f}'.format(i))
for i in model.coef_.tolist()] # 模型斜率
results["intercept"] = round(model.intercept_, 3) # 模型截距
y_pred = model.predict(x_test) # 预测值
# y_test = y_test.values
# 误差,用round保留三位小数且四舍五入
mae = round(mean_absolute_error(y_test, y_pred), 3)
rmse = round(np.sqrt(mean_squared_error(y_test, y_pred)), 3)
r2 = round(r2_score(y_test, y_pred), 3)
# y_test = [x[0] for x in np.array(y_test).tolist()]
# y_pred = [x[0] for x in y_pred.tolist()]
y_test = np.array(y_test).tolist()
y_pred = y_pred.tolist()
res = {"y_test": y_test, "y_pred": y_pred, "error": {
"MAE": mae, "RMSE": rmse, "R2": r2}, "results": results}
print(res)
return res
# return {"y_test": y_test, "y_pred": y_pred, "error": {"MAE": mae, "RMSE": rmse, "R2": r2}, "results": results}
else:
return "模型训练出错"
def predict_connectivity(x, x1, y, test_size, algorithm, paras):
# 划分数据集
x_train, x_test, y_train, y_test = train_test_split(
x, y, test_size=test_size, random_state=0)
# 机器学习
model = None
results = {}
if algorithm == 1: # 最小二乘法线性回归
model = LinearRegression(fit_intercept=paras["fit_intercept"])
if algorithm == 2: # 随机森林回归
model = RandomForestRegressor(n_estimators=paras["n_estimators"],
criterion=paras["criterion"], max_depth=paras["max_depth"], random_state=0)
if algorithm == 3: # BP神经网络回归
model = MLPRegressor(hidden_layer_sizes=(paras["hidden_layer_sizes_1"], paras["hidden_layer_sizes_2"]),
activation=paras["activation"], solver='lbfgs', random_state=paras["random_state"])
if algorithm == 4: # XGBoost回归
model = XGBRegressor(
max_depth=paras["max_depth"], learning_rate=paras["learning_rate"], n_estimators=paras["n_estimators"])
if algorithm == 5: # LightGBM回归
model = lgb.LGBMRegressor(objective='regression', max_depth=paras["max_depth"],
learning_rate=paras["learning_rate"], random_state=paras["random_state"], n_estimators=paras["n_estimators"])
# 返回数据
if model != None:
model.fit(x_train, y_train)
y_pred = model.predict(x1).tolist() # 预测值
return y_pred
else:
return "预测连通性出错"
# 登录验证
class Login(BaseModel): # 接口数据类型
username: str
password: str
@app.post("/login") # 接口
async def login(login: Login):
username = login.username
password = login.password
if username == "admin" and password == "123456":
return True
return False
# 处理用户数据
class Process_user(BaseModel): # 接口数据类型
mode: int
data: List
miss: List
abnormal: List
standard: List
@app.post("/process/user") # 接口
async def process_user(user: Process_user):
mode = user.mode # 选择的井间连通模式
df_inside = pd.read_csv(
"./mode_{}.csv".format(mode)).dropna(axis=0) # 连通模式对应的内置数据
df_user = json2df(user.data)
abnormal = user.abnormal[0]
miss = user.miss[0]
standard = user.standard[0]
# 异常值处理
if abnormal["state"]:
abnormaled = process_abnormal(
df_inside, df_user, abnormal["detect"], abnormal["method"])
df_inside = abnormaled["df_inside"]
df_user = abnormaled["df_user"]
# 缺失值处理
if miss["state"]:
df_user = process_miss(df_user, miss["method"])
# 标准化处理
if standard["state"]:
standarded = process_standard(df_inside, df_user, standard["method"])
df_inside = standarded["df_inside"]
df_user = standarded["df_user"]
# 用astype将数值转科学计数法
return {"inside": df_inside.astype('str').to_json(orient='records'), "user": df_user.astype('str').to_json(orient='records')}
# # 用astype将数值转科学计数法
# return df.astype('str').to_json(orient='records')
# 处理内置数据
class Process_inside(BaseModel): # 接口数据类型
data: List
abnormal: List
standard: List
@app.post("/process/inside") # 接口
async def process_inside(inside: Process_inside):
df = json2df(inside.data)
abnormal = inside.abnormal[0]
standard = inside.standard[0]
# 异常值处理
if abnormal["state"]:
df = process_abnormal(df, abnormal["detect"], abnormal["method"])
# 标准化处理:只对特征进行标准化,不包括标签(后三列)
if standard["state"]:
df = pd.concat([process_standard(df.iloc[:, :12],
standard["method"]), df.iloc[:, 12:]], axis=1)
# 用astype将数值转科学计数法
return df.astype('str').to_json(orient='records')
# 训练模型
class Train(BaseModel): # 接口数据类型
data: List
test_size: float
algorithm: int
paras: List
@app.post("/train") # 接口
async def train(train: Train):
# 解析数据
df = json2df(train.data)
test_size = train.test_size
algorithm = train.algorithm
paras = train.paras[0]
x = df.iloc[:, :12]
y1 = df.loc[:, "BSR"]
y2 = df.loc[:, "SBR"]
y3 = df.loc[:, "D"]
bsr = train_model(x, y1, test_size, algorithm, paras)
sbr = train_model(x, y2, test_size, algorithm, paras)
x_train, x_test, y_train, y_test = train_test_split(
x, y3, test_size=test_size, random_state=0)
d = {"y_test": np.array(y_test).tolist(), "y_pred": np.sum(
[bsr["y_pred"], sbr["y_pred"]], axis=0).tolist()}
return {"bsr": bsr, "sbr": sbr, "d": d}
# 预测连通性
class Predict(BaseModel): # 接口数据类型
data_train: List
data_predict: List
test_size: float
algorithm: int
paras: List
@app.post("/predict") # 接口
async def predict(predict: Predict):
# 解析数据
df_train = json2df(predict.data_train)
df_predict = json2df(predict.data_predict)
test_size = predict.test_size
algorithm = predict.algorithm
paras = predict.paras[0]
x = df_train.iloc[:, :12]
y1 = df_train.loc[:, "BSR"]
y2 = df_train.loc[:, "SBR"]
# 预测连通性
bsr = predict_connectivity(x, df_predict, y1, test_size, algorithm, paras)
sbr = predict_connectivity(x, df_predict, y2, test_size, algorithm, paras)
d = np.sum([bsr, sbr], axis=0).tolist()
# 合并为一个list后转dataframe再转json实现前端表格数据格式
data = []
data.append(bsr)
data.append(sbr)
data.append(d)
df_result = pd.concat([pd.DataFrame(predict.data_predict), pd.DataFrame(data=np.array(
data).T.tolist(), columns=["BSR", "SBR", "D"])], axis=1)
return df_result.to_json(orient='records')
# return pd.DataFrame(data=np.array(data).T.tolist(), columns=["BSR", "SBR", "D"]).to_json(orient='records')
# # 图片测试
# def create_img():
# plt.rcParams['figure.figsize'] = [7.50, 3.50]
# plt.rcParams['figure.autolayout'] = True
# plt.plot([1, 2])
# img_buf = io.BytesIO()
# plt.savefig(img_buf, format='png')
# plt.close()
# return img_buf
# @app.get('/png')
# async def get_img(background_tasks: BackgroundTasks):
# img_buf = create_img()
# # get the entire buffer content
# # because of the async, this will await the loading of all content
# bufContents: bytes = img_buf.getvalue()
# background_tasks.add_task(img_buf.close)
# headers = {'Content-Disposition': 'inline; filename="out.png"'}
# return Response(bufContents, headers=headers, media_type='image/png')