Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
from sklearn.preprocessing import MinMaxScaler | |
from keras.models import Sequential | |
from keras.layers import Dense | |
from keras.layers import LSTM | |
from keras.models import Sequential | |
from keras.layers import Activation, Dense | |
from keras.layers import LSTM | |
from keras.layers import Dropout | |
from tqdm import tqdm | |
neurons = 512 # number of hidden units in the LSTM layer | |
activation_function = "tanh" # activation function for LSTM and Dense layer | |
loss = ( | |
"mse" # loss function for calculating the gradient, in this case Mean Squared Error | |
) | |
optimizer = "adam" # optimizer for appljying gradient decent | |
dropout = 0.25 # dropout ratio used after each LSTM layer to avoid overfitting | |
batch_size = 128 | |
def preprocess(df): | |
df = df.copy() | |
df["ts"] = df["ts"].astype(np.int64) | |
df["ts"] = df["ts"] / 1000 | |
df["timestamp"] = pd.to_datetime(df["ts"], unit="s") | |
df = df[["timestamp", "low", "high", "close", "open", "quoteVol"]] | |
for col in ["low", "high", "close", "open", "quoteVol"]: | |
df[col] = df[col].astype(float) | |
df.set_index(df["timestamp"], inplace=True) | |
df.drop(["timestamp"], axis=1, inplace=True) | |
df["Date"] = pd.to_datetime(df.index.values.tolist()).date | |
return df | |
def normalize(closedf): | |
scaler = MinMaxScaler(feature_range=(0, 1)) | |
closedfsc = scaler.fit_transform( | |
np.array(closedf.drop("Date", axis=1)).reshape(-1, 1) | |
) | |
return closedfsc, scaler | |
def split_train_test(closedfsc, training_size, test_size): | |
train_data, test_data = ( | |
closedfsc[0:training_size, :], | |
closedfsc[training_size : len(closedfsc), :1], | |
) | |
return train_data, test_data | |
def create_dataset(dataset, time_step=1): | |
dataX, dataY = [], [] | |
for i in range(len(dataset) - time_step - 1): | |
a = dataset[i : (i + time_step), 0] ###i=0, 0,1,2,3-----99 100 | |
dataX.append(a) | |
dataY.append(dataset[i + time_step, 0]) | |
return np.array(dataX), np.array(dataY) | |
# def build_model(inputs): | |
# model = Sequential() | |
# model.add( | |
# LSTM( | |
# neurons, | |
# return_sequences=True, | |
# input_shape=(inputs.shape[1], inputs.shape[2]), | |
# activation=activation_function, | |
# ) | |
# ) | |
# model.add(Dropout(dropout)) | |
# model.add(LSTM(neurons, return_sequences=True, activation=activation_function)) | |
# model.add(Dropout(dropout)) | |
# model.add(LSTM(neurons, activation=activation_function)) | |
# model.add(Dropout(dropout)) | |
# model.add(Dense(units=1)) | |
# model.add(Activation(activation_function)) | |
# model.compile(loss=loss, optimizer=optimizer, metrics=["mae"]) | |
# return model | |
def build_model(): | |
model = Sequential() | |
model.add(LSTM(256, input_shape=(None, 1), activation="relu")) | |
model.add(Dense(1)) | |
model.compile(loss="mean_squared_error", optimizer="adam") | |
return model | |
def train_model( | |
model, x_train, y_train, X_test, y_test, epochs, progress_callback=None | |
): | |
train_losses = [] # To store training losses | |
val_losses = [] # To store validation losses | |
for epoch in tqdm(range(epochs)): | |
history = model.fit( | |
x_train, | |
y_train, | |
epochs=1, | |
verbose=0, | |
validation_data=(X_test, y_test), | |
batch_size=32, | |
) | |
train_loss = history.history["loss"][0] | |
val_loss = history.history["val_loss"][0] | |
train_losses.append(train_loss) | |
val_losses.append(val_loss) | |
if progress_callback: | |
progress_callback(epoch, history) | |
return model, train_losses, val_losses | |