|
import numpy as np |
|
import pandas as pd |
|
from sklearn.preprocessing import LabelEncoder |
|
from sklearn.model_selection import train_test_split |
|
from tensorflow.keras.models import Sequential, Model |
|
from tensorflow.keras.layers import Dense, Dropout, Input, LayerNormalization, MultiHeadAttention, GlobalAveragePooling1D, Embedding, Layer, LSTM, Bidirectional, Conv1D |
|
from tensorflow.keras.optimizers import Adam |
|
from tensorflow.keras.utils import to_categorical |
|
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau |
|
import tensorflow as tf |
|
import optuna |
|
import gradio as gr |
|
|
|
|
|
data = [ |
|
"Double big 12", "Single big 11", "Single big 13", "Double big 12", "Double small 10", |
|
"Double big 12", "Double big 12", "Single small 7", "Single small 5", "Single small 9", |
|
"Single big 13", "Double small 8", "Single small 5", "Double big 14", "Single big 11", |
|
"Double big 14", "Single big 17", "Triple 9", "Double small 6", "Single big 13", |
|
"Double big 14", "Double small 8", "Double small 8", "Single big 13", "Single small 9", |
|
"Double small 8", "Double small 8", "Single big 12", "Double small 8", "Double big 14", |
|
"Double small 10", "Single big 13", "Single big 11", "Double big 14", "Double big 14", |
|
"Double small", "Single big", "Double biga", "Single small", "Single small", |
|
"Double small", "Single small", "Single small", "Double small", "Double small", |
|
"Double big", "Single big", "Triple", "Double big", "Single big", "Single big", |
|
"Double small", "Single small", "Double big", "Double small", "Double big", |
|
"Single small", "Single big", "Double small", "Double big", "Double big", |
|
"Double small", "Single big", "Double big", "Triple", "Single big", "Double small", |
|
"Single big", "Single small", "Double small", "Single big", "Single big", |
|
"Single big", "Double small", "Double small", "Single big", "Single small", |
|
"Single big", "Single small", "Single small", "Double small", "Single small", |
|
"Single big" |
|
] |
|
|
|
|
|
num_data_points = len(data) |
|
print(f'Total number of data points: {num_data_points}') |
|
|
|
|
|
encoder = LabelEncoder() |
|
encoded_data = encoder.fit_transform(data) |
|
|
|
|
|
sequence_length = 10 |
|
X, y = [], [] |
|
for i in range(len(encoded_data) - sequence_length): |
|
X.append(encoded_data[i:i + sequence_length]) |
|
y.append(encoded_data[i + sequence_length]) |
|
|
|
X = np.array(X) |
|
y = np.array(y) |
|
y = to_categorical(y, num_classes=len(encoder.classes_)) |
|
|
|
|
|
X = X.reshape((X.shape[0], X.shape[1])) |
|
|
|
print(f'Input shape: {X.shape}') |
|
print(f'Output shape: {y.shape}') |
|
|
|
class TransformerBlock(Layer): |
|
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): |
|
super(TransformerBlock, self).__init__() |
|
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) |
|
self.ffn = Sequential([ |
|
Dense(ff_dim, activation="relu"), |
|
Dense(embed_dim), |
|
]) |
|
self.layernorm1 = LayerNormalization(epsilon=1e-6) |
|
self.layernorm2 = LayerNormalization(epsilon=1e-6) |
|
self.dropout1 = Dropout(rate) |
|
self.dropout2 = Dropout(rate) |
|
|
|
def call(self, inputs, training=False): |
|
attn_output = self.att(inputs, inputs) |
|
attn_output = self.dropout1(attn_output, training=training) |
|
out1 = self.layernorm1(inputs + attn_output) |
|
ffn_output = self.ffn(out1) |
|
ffn_output = self.dropout2(ffn_output, training=training) |
|
return self.layernorm2(out1 + ffn_output) |
|
|
|
def build_model(trial): |
|
embed_dim = trial.suggest_int('embed_dim', 64, 256, step=32) |
|
num_heads = trial.suggest_int('num_heads', 2, 8, step=2) |
|
ff_dim = trial.suggest_int('ff_dim', 128, 512, step=64) |
|
rate = trial.suggest_float('dropout', 0.1, 0.5, step=0.1) |
|
num_transformer_blocks = trial.suggest_int('num_transformer_blocks', 1, 3) |
|
|
|
inputs = Input(shape=(sequence_length,)) |
|
embedding_layer = Embedding(input_dim=len(encoder.classes_), output_dim=embed_dim) |
|
x = embedding_layer(inputs) |
|
|
|
for _ in range(num_transformer_blocks): |
|
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim, rate) |
|
x = transformer_block(x) |
|
|
|
x = Conv1D(128, 3, activation='relu')(x) |
|
x = Bidirectional(LSTM(128, return_sequences=True))(x) |
|
x = GlobalAveragePooling1D()(x) |
|
x = Dropout(rate)(x) |
|
x = Dense(ff_dim, activation="relu")(x) |
|
x = Dropout(rate)(x) |
|
outputs = Dense(len(encoder.classes_), activation="softmax")(x) |
|
|
|
model = Model(inputs=inputs, outputs=outputs) |
|
|
|
optimizer = Adam(learning_rate=trial.suggest_float('lr', 1e-5, 1e-2, log=True)) |
|
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) |
|
|
|
return model |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42) |
|
|
|
def objective(trial): |
|
model = build_model(trial) |
|
|
|
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) |
|
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6) |
|
|
|
history = model.fit( |
|
X_train, y_train, |
|
epochs=100, |
|
batch_size=64, |
|
validation_data=(X_val, y_val), |
|
callbacks=[early_stopping, reduce_lr], |
|
verbose=0 |
|
) |
|
|
|
val_accuracy = max(history.history['val_accuracy']) |
|
return val_accuracy |
|
|
|
study = optuna.create_study(direction='maximize') |
|
study.optimize(objective, n_trials=50) |
|
|
|
best_trial = study.best_trial |
|
print(f'Best hyperparameters: {best_trial.params}') |
|
|
|
best_model = build_model(best_trial) |
|
early_stopping = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) |
|
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6) |
|
|
|
history = best_model.fit( |
|
X_train, y_train, |
|
epochs=500, |
|
batch_size=64, |
|
validation_data=(X_val, y_val), |
|
callbacks=[early_stopping, reduce_lr], |
|
verbose=2 |
|
) |
|
|
|
|
|
test_loss, test_accuracy = best_model.evaluate(X_test, y_test, verbose=0) |
|
print(f'Test accuracy: {test_accuracy:.4f}') |
|
|
|
def predict_next(model, data, sequence_length, encoder): |
|
last_sequence = data[-sequence_length:] |
|
last_sequence = np.array(encoder.transform(last_sequence)).reshape((1, sequence_length)) |
|
prediction = model.predict(last_sequence) |
|
predicted_label = encoder.inverse_transform([np.argmax(prediction)]) |
|
return predicted_label[0] |
|
|
|
def update_data(data, new_outcome): |
|
data.append(new_outcome) |
|
if len(data) > sequence_length: |
|
data.pop(0) |
|
return data |
|
|
|
def retrain_model(model, X, y, epochs=10): |
|
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) |
|
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=1e-6) |
|
|
|
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
model.fit( |
|
X_train, y_train, |
|
epochs=epochs, |
|
batch_size=64, |
|
validation_data=(X_val, y_val), |
|
callbacks=[early_stopping, reduce_lr], |
|
verbose=0 |
|
) |
|
return model |
|
|
|
|
|
def gradio_predict(outcome): |
|
global data, X, y, best_model |
|
|
|
if outcome not in encoder.classes_: |
|
return "Invalid outcome. Please try again." |
|
|
|
data = update_data(data, outcome) |
|
|
|
if len(data) < sequence_length: |
|
return "Not enough data to make a prediction." |
|
|
|
predicted_next = predict_next(best_model, data, sequence_length, encoder) |
|
return f'Predicted next outcome: {predicted_next}' |
|
|
|
def gradio_update(actual_next): |
|
global data, X, y, best_model |
|
|
|
if actual_next not in encoder.classes_: |
|
return "Invalid outcome. Please try again." |
|
|
|
data = update_data(data, actual_next) |
|
|
|
if len(data) < sequence_length: |
|
return "Not enough data to update the model." |
|
|
|
encoded_actual_next = encoder.transform([actual_next])[0] |
|
new_X = np.append(X, [X[-sequence_length:]], axis=0) |
|
new_y = np.append(y, to_categorical(encoded_actual_next, num_classes=len(encoder.classes_)), axis=0) |
|
|
|
best_model = retrain_model(best_model, new_X, new_y, epochs=10) |
|
|
|
return "Model updated with new data." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## Outcome Prediction with Enhanced Transformer") |
|
with gr.Row(): |
|
outcome_input = gr.Textbox(label="Current Outcome") |
|
predict_button = gr.Button("Predict Next") |
|
predicted_output = gr.Textbox(label="Predicted Next Outcome") |
|
with gr.Row(): |
|
actual_input = gr.Textbox(label="Actual Next Outcome") |
|
update_button = gr.Button("Update Model") |
|
update_output = gr.Textbox(label="Update Status") |
|
|
|
predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) |
|
update_button.click(gradio_update, inputs=actual_input, outputs=update_output) |
|
|
|
demo.launch() |
|
|
|
|
|
best_model.save("enhanced_transformer_model.h5") |
|
print("Model saved as enhanced_transformer_model.h5") |
|
|
|
|
|
loaded_model = tf.keras.models.load_model("enhanced_transformer_model.h5", custom_objects={'TransformerBlock': TransformerBlock}) |
|
|
|
|
|
def test_loaded_model(test_outcome): |
|
global data |
|
|
|
if test_outcome not in encoder.classes_: |
|
return "Invalid outcome. Test prediction aborted." |
|
|
|
data = update_data(data, test_outcome) |
|
if len(data) >= sequence_length: |
|
predicted_next = predict_next(loaded_model, data, sequence_length, encoder) |
|
return f'Predicted next outcome with loaded model: {predicted_next}' |
|
else: |
|
return "Not enough data to make a prediction." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## Outcome Prediction with Enhanced Transformer") |
|
with gr.Row(): |
|
outcome_input = gr.Textbox(label="Current Outcome") |
|
predict_button = gr.Button("Predict Next") |
|
predicted_output = gr.Textbox(label="Predicted Next Outcome") |
|
with gr.Row(): |
|
actual_input = gr.Textbox(label="Actual Next Outcome") |
|
update_button = gr.Button("Update Model") |
|
update_output = gr.Textbox(label="Update Status") |
|
with gr.Row(): |
|
test_input = gr.Textbox(label="Test Outcome for Loaded Model") |
|
test_button = gr.Button("Test Loaded Model") |
|
test_output = gr.Textbox(label="Loaded Model Prediction") |
|
|
|
predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) |
|
update_button.click(gradio_update, inputs=actual_input, outputs=update_output) |
|
test_button.click(test_loaded_model, inputs=test_input, outputs=test_output) |
|
|
|
demo.launch() |