File size: 2,711 Bytes
aaf6ffd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import gradio as gr
from huggingface_hub import from_pretrained_keras
import pandas as pd
import numpy as np
import json
from matplotlib import pyplot as plt

f = open('scaler.json')
scaler = json.load(f)

TIME_STEPS = 288

# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
    output = []
    for i in range(len(values) - time_steps + 1):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)


def normalize_data(data):
    df_test_value = (data - scaler["mean"]) / scaler["std"]
    return df_test_value

def plot_test_data(df_test_value):
    fig, ax = plt.subplots(figsize=(12, 6))
    df_test_value.plot(legend=False, ax=ax)
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Input Test Data")
    return fig

def get_anomalies(df_test_value):
    # Create sequences from test values.
    x_test = create_sequences(df_test_value.values)
    model = from_pretrained_keras("keras-io/timeseries-anomaly-detection")

    # Get test MAE loss.
    x_test_pred = model.predict(x_test)
    test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
    test_mae_loss = test_mae_loss.reshape((-1))

    # Detect all the samples which are anomalies.
    anomalies = test_mae_loss > scaler["threshold"]
    return anomalies

def plot_anomalies(df_test_value, data, anomalies):
    # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
    anomalous_data_indices = []
    for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
        if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
            anomalous_data_indices.append(data_idx)
    df_subset = data.iloc[anomalous_data_indices]
    fig, ax = plt.subplots(figsize=(12, 6))
    data.plot(legend=False, ax=ax)
    df_subset.plot(legend=False, ax=ax, color="r")
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Anomalous Data Points")
    return fig
                                      
def master(file):
    # read file
    data = pd.read_csv(file, parse_dates=True, index_col="timestamp")
    df_test_value = normalize_data(data)
    # plot input test data
    plot1 = plot_test_data(df_test_value)
    # predict
    anomalies = get_anomalies(df_test_value)
    #plot anomalous data points
    plot2 = plot_anomalies(df_test_value, data, anomalies)
    return plot2

outputs = gr.outputs.Image()

iface = gr.Interface(
    fn=master,
    inputs=gr.inputs.File(label="CSV File"),
    outputs=outputs,
    examples=["art_daily_jumpsup.csv"],
    title="Timeseries Anomaly Detection Using an Autoencoder",
    description="Anomaly detection of timeseries data."
)

iface.launch()