Streaming anomaly detection

Useful algorithms:

Experimental data

Let’s assume the following sequence of observations that we will use with a variety of algorithms. The data is labelled with label with 0 for normal observations and 1 for anomalies.

import pandas as pd

df = pd.read_csv("../../data/streamad/uniDS.csv")

The Welford algorithm

The Welford’s method is an online algorithm (idescribe single-pass) to calculate running variance and standard deviation. It is formulated from the difference between the squared difference sums of $N$ and $N-1$ observations.

A basic implementation of the Welford’s method can be:

import math
class Welford(object):
    def __init__(self):
        self.k = 0
        self.M = 0
        self.S = 0
    
    def update(self,x):
        if x is None:
            return
        self.k += 1
        newM = self.M + (x - self.M)*1./self.k
        newS = self.S + (x - self.M)*(x - newM)
        self.M, self.S = newM, newS
            
    @property
    def mean(self):
        return self.M
    @property
    def meanfull(self):
        return self.mean, self.std/math.sqrt(self.k)
    @property
    def std(self):
        if self.k==1:
            return 0
        return math.sqrt(self.S/(self.k-1))
    def __repr__(self):
        return "<Welford: {} +- {}>".format(self.mean, self.std)

Applying Welford’s algorothm to our data we have:

welford = Welford()

means = []
for value in df.value.to_list():
    welford.update(value)
    means.append(welford.mean)

Anomaly detection in streams with extreme value theory

A more in-detail page is available at Streaming anomaly detection with Extreme Value Theory.

SPOT

An example with streamad’s SPOT4 detector. This is available in the streamad.model.SpotDetector package.

from streamad.util import StreamGenerator, UnivariateDS, plot
from streamad.util.dataset import CustomDS
from streamad.model import SpotDetector

ds = CustomDS("../../data/streamad/uniDS.csv")
stream = StreamGenerator(ds.data)
model = SpotDetector()

scores = []

for x in stream.iter_item():
    score = model.fit_score(x)
    if score:
        scores.append(score)
    else:
        scores.append(0)

data, label, date, features = ds.data, ds.label, ds.date, ds.features

Half-Space Trees

See Half-Space Trees.