Streaming anomaly detection
Useful algorithms:
- Conformalised density and distance-based anomaly detection in time-series data1
- elford algorithm
- Anomaly detection in streams with extreme value theory
- Robust random cut forest based anomaly detection on streams2
- Time-series anomaly detection service at Microsoft3
- Half-Space Trees
Experimental data
Let’s assume the following sequence of observations that we will use with a variety of algorithms. The data is labelled with label with 0 for normal observations and 1 for anomalies.
import pandas as pd
df = pd.read_csv("../../data/streamad/uniDS.csv")
The Welford algorithm
The Welford’s method is an online algorithm (idescribe single-pass) to calculate running variance and standard deviation. It is formulated from the difference between the squared difference sums of N and N-1 observations.
A basic implementation of the Welford’s method can be:
import math
class Welford(object):
    def __init__(self):
        self.k = 0
        self.M = 0
        self.S = 0
    
    def update(self,x):
        if x is None:
            return
        self.k += 1
        newM = self.M + (x - self.M)*1./self.k
        newS = self.S + (x - self.M)*(x - newM)
        self.M, self.S = newM, newS
            
    @property
    def mean(self):
        return self.M
    @property
    def meanfull(self):
        return self.mean, self.std/math.sqrt(self.k)
    @property
    def std(self):
        if self.k==1:
            return 0
        return math.sqrt(self.S/(self.k-1))
    def __repr__(self):
        return "<Welford: {} +- {}>".format(self.mean, self.std)Applying Welford’s algorothm to our data we have:
welford = Welford()
means = []
for value in df.value.to_list():
    welford.update(value)
    means.append(welford.mean)
Anomaly detection in streams with extreme value theory
A more in-detail page is available at Streaming anomaly detection with Extreme Value Theory.
SPOT
An example with streamad’s SPOT4 detector. This is available in the streamad.model.SpotDetector package.
from streamad.util import StreamGenerator, UnivariateDS, plot
from streamad.util.dataset import CustomDS
from streamad.model import SpotDetector
ds = CustomDS("../../data/streamad/uniDS.csv")
stream = StreamGenerator(ds.data)
model = SpotDetector()
scores = []
for x in stream.iter_item():
    score = model.fit_score(x)
    if score:
        scores.append(score)
    else:
        scores.append(0)
data, label, date, features = ds.data, ds.label, ds.date, ds.features
Half-Space Trees
See Half-Space Trees.