keras_bitget / utils /preprocess /preprocess_data.py
ashhadahsan
commit one
e59b179
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from keras.layers import Activation, Dense
from keras.layers import LSTM
from keras.layers import Dropout
from tqdm import tqdm
neurons = 512 # number of hidden units in the LSTM layer
activation_function = "tanh" # activation function for LSTM and Dense layer
loss = (
"mse" # loss function for calculating the gradient, in this case Mean Squared Error
)
optimizer = "adam" # optimizer for appljying gradient decent
dropout = 0.25 # dropout ratio used after each LSTM layer to avoid overfitting
batch_size = 128
def preprocess(df):
df = df.copy()
df["ts"] = df["ts"].astype(np.int64)
df["ts"] = df["ts"] / 1000
df["timestamp"] = pd.to_datetime(df["ts"], unit="s")
df = df[["timestamp", "low", "high", "close", "open", "quoteVol"]]
for col in ["low", "high", "close", "open", "quoteVol"]:
df[col] = df[col].astype(float)
df.set_index(df["timestamp"], inplace=True)
df.drop(["timestamp"], axis=1, inplace=True)
df["Date"] = pd.to_datetime(df.index.values.tolist()).date
return df
def normalize(closedf):
scaler = MinMaxScaler(feature_range=(0, 1))
closedfsc = scaler.fit_transform(
np.array(closedf.drop("Date", axis=1)).reshape(-1, 1)
)
return closedfsc, scaler
def split_train_test(closedfsc, training_size, test_size):
train_data, test_data = (
closedfsc[0:training_size, :],
closedfsc[training_size : len(closedfsc), :1],
)
return train_data, test_data
def create_dataset(dataset, time_step=1):
dataX, dataY = [], []
for i in range(len(dataset) - time_step - 1):
a = dataset[i : (i + time_step), 0] ###i=0, 0,1,2,3-----99 100
dataX.append(a)
dataY.append(dataset[i + time_step, 0])
return np.array(dataX), np.array(dataY)
# def build_model(inputs):
# model = Sequential()
# model.add(
# LSTM(
# neurons,
# return_sequences=True,
# input_shape=(inputs.shape[1], inputs.shape[2]),
# activation=activation_function,
# )
# )
# model.add(Dropout(dropout))
# model.add(LSTM(neurons, return_sequences=True, activation=activation_function))
# model.add(Dropout(dropout))
# model.add(LSTM(neurons, activation=activation_function))
# model.add(Dropout(dropout))
# model.add(Dense(units=1))
# model.add(Activation(activation_function))
# model.compile(loss=loss, optimizer=optimizer, metrics=["mae"])
# return model
def build_model():
model = Sequential()
model.add(LSTM(256, input_shape=(None, 1), activation="relu"))
model.add(Dense(1))
model.compile(loss="mean_squared_error", optimizer="adam")
return model
def train_model(
model, x_train, y_train, X_test, y_test, epochs, progress_callback=None
):
train_losses = [] # To store training losses
val_losses = [] # To store validation losses
for epoch in tqdm(range(epochs)):
history = model.fit(
x_train,
y_train,
epochs=1,
verbose=0,
validation_data=(X_test, y_test),
batch_size=32,
)
train_loss = history.history["loss"][0]
val_loss = history.history["val_loss"][0]
train_losses.append(train_loss)
val_losses.append(val_loss)
if progress_callback:
progress_callback(epoch, history)
return model, train_losses, val_losses