Streaming anomaly detection
Useful algorithms:
- Conformalised density and distance-based anomaly detection in time-series data1
- elford algorithm
- Anomaly detection in streams with extreme value theory
- Robust random cut forest based anomaly detection on streams2
- Time-series anomaly detection service at Microsoft3
- Half-Space Trees
Experimental data
Let’s assume the following sequence of observations that we will use with a variety of algorithms. The data is labelled with label
with 0
for normal observations and 1
for anomalies.
import pandas as pd
= pd.read_csv("../../data/streamad/uniDS.csv") df
The Welford algorithm
The Welford’s method is an online algorithm (idescribe single-pass) to calculate running variance and standard deviation. It is formulated from the difference between the squared difference sums of \(N\) and \(N-1\) observations.
A basic implementation of the Welford’s method can be:
import math
class Welford(object):
def __init__(self):
self.k = 0
self.M = 0
self.S = 0
def update(self,x):
if x is None:
return
self.k += 1
= self.M + (x - self.M)*1./self.k
newM = self.S + (x - self.M)*(x - newM)
newS self.M, self.S = newM, newS
@property
def mean(self):
return self.M
@property
def meanfull(self):
return self.mean, self.std/math.sqrt(self.k)
@property
def std(self):
if self.k==1:
return 0
return math.sqrt(self.S/(self.k-1))
def __repr__(self):
return "<Welford: {} +- {}>".format(self.mean, self.std)
Applying Welford’s algorothm to our data we have:
= Welford()
welford
= []
means for value in df.value.to_list():
welford.update(value) means.append(welford.mean)
Anomaly detection in streams with extreme value theory
A more in-detail page is available at Streaming anomaly detection with Extreme Value Theory.
SPOT
An example with streamad
’s SPOT
4 detector. This is available in the streamad.model.SpotDetector
package.
from streamad.util import StreamGenerator, UnivariateDS, plot
from streamad.util.dataset import CustomDS
from streamad.model import SpotDetector
= CustomDS("../../data/streamad/uniDS.csv")
ds = StreamGenerator(ds.data)
stream = SpotDetector()
model
= []
scores
for x in stream.iter_item():
= model.fit_score(x)
score if score:
scores.append(score)else:
0)
scores.append(
= ds.data, ds.label, ds.date, ds.features data, label, date, features
Half-Space Trees
See Half-Space Trees.