Module PdmModule.Handlers.pairdetectionHandler

Expand source code
import numpy as np
from numpy import ndarray
from PdmModule.utils.structure import PredictionPoint,Datapoint,Eventpoint
from PdmModule.Models import pairdetection
import pandas as pd


def auto_str(cls):
    def __str__(self):
        return '%s(%s)' % (
            type(self).__name__,
            ', '.join('%s=%s' % item for item in vars(self).items())
        )
    cls.__str__ = __str__
    return cls

@auto_str
class PairDectionHandler():
    # inner or selftunne
    '''
        Techniques Handlers (Handlers)

        In their initialization, they should accept their parameters, source, and reset codes. The source is a string that indicates the source of data that the Technique handler monitors. The reset codes are a list of tuples with all event codes and sources that may cause a reset in the technique (start over). The tuples in that list should be in the form of ev=(code, source) since they are tested like:

        if ev[0] == event.code and ev[1] == event.source:

        Interface:

        1. get_Datapoint(timestamp, vector, source, description) Feed an upcoming sample to the predictor, and a prediction is returned (when no available prediction, a PredictionPoint with None values should be returned).

        Returns a tuple: (PredictionPoint, Datapoint) - the prediction and the data in the form of Datapoint.

        2. get_event(Eventpoint) Feed an upcoming event to the predictor.

        Returns a Boolean, PredictionPoint, Datapoint. A boolean value is returned if the event causes a reset in the technique. Moreover, some techniques (which perform batch prediction) return the predictions and the list of Datapoints that the prediction concerns (if these are None, they are ignored).

        '''

    def __init__(self,source,thresholdtype="inner",thresholdfactor=2,ProfileSize=30,ActualProfileSize=1,resetcodes=[],currentReference=None,constThresholdfilter=float('-inf'),sequencesize=1,alarmsThreshold=0,normz=False):
        self.profilebuffer=[]
        self.profilebuffertimestamps=[]
        self.sequencesize=sequencesize
        self.alarmsThreshold=alarmsThreshold
        self.pointbuffer=[]
        self.currentReference=currentReference
        self.normz = normz
        self.constThresholdfilter=constThresholdfilter

        self.resetcodes = resetcodes
        self.thresholdtype = thresholdtype
        self.thresholdfactor = thresholdfactor
        self.source=source
        self.ProfileSize=ProfileSize
        if thresholdtype=="inner":
            self.ActualProfileSize = ProfileSize
        elif ActualProfileSize<1:
            self.ActualProfileSize=int(ActualProfileSize*self.ProfileSize)
        else:
            self.ActualProfileSize=ActualProfileSize

        self.model = pairdetection.PairDetection(thresholdtype, thresholdfactor,self.ActualProfileSize,constThresholdfilter=self.constThresholdfilter,alarmsThreshold=self.alarmsThreshold,normz=self.normz)


        if self.currentReference is None:
            self.calculated_profile = False
        else:
            temp_datapoint = Datapoint(self.currentReference, None, None, source)
            self.model.initilize(temp_datapoint)
            self.calculated_profile = True
    def manual_update_and_reset(self,thresholdtype=None,thresholdfactor=None):
        if thresholdtype!=None:
            self.thresholdtype = thresholdtype
        if thresholdfactor!=None:
            self.thresholdfactor = thresholdfactor
        self.model = pairdetection.PairDetection(thresholdtype, thresholdfactor,self.ActualProfileSize,constThresholdfilter=self.constThresholdfilter,actualProfileSize=self.alarmsThreshold,normz=self.normz)
    def get_current_reference(self):
        return self.currentReference

    def calculateReferenceData(self):
        if len(self.profilebuffer)>self.ProfileSize:


            data = [[seqdata.current for seqdata in seqpoint] for seqpoint in self.profilebuffer]
            tempnumpy=[]
            for sequense in data:
                tempnumpy.append(np.array([np.array(corrs) for corrs in sequense]))
            tempnumpy=np.array(tempnumpy)
            foundprofile = tempnumpy
            self.calculated_profile = True
            self.currentReference = foundprofile
    def get_Datapoint(self,timestamp : pd.Timestamp,data : ndarray, source) -> PredictionPoint:
        temp_datapoint = Datapoint(self.currentReference, data, timestamp, source)

        if source==self.source:
            self.pointbuffer.append(temp_datapoint)
            if len(self.pointbuffer) >= self.sequencesize:
                self.pointbuffer = self.pointbuffer[-self.sequencesize:]


            if len(self.pointbuffer) < self.sequencesize:
                prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                             temp_datapoint.source, description="no profile yet")

            elif self.calculated_profile==False:
                # datapoint pass reference domain profile
                self.profilebuffer.append([dd for dd in self.pointbuffer])
                self.profilebuffertimestamps.append(timestamp)
                self.calculateReferenceData()
                if self.calculated_profile:
                    temp_datapoint = Datapoint(self.currentReference, np.array([np.array(corrs) for corrs in self.pointbuffer]), timestamp, source)
                    self.model.initilize(temp_datapoint)
                prediction= PredictionPoint(None, None, None,self.thresholdtype, temp_datapoint.timestamp,temp_datapoint.source,description="no profile yet")
            else:
                temp_datapoint = Datapoint(self.currentReference,
                                           np.array([np.array(corrs.current) for corrs in self.pointbuffer]), timestamp,
                                           source)
                prediction=self.model.get_data(temp_datapoint)
                prediction.description="pair detection"
            return prediction,temp_datapoint
        prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                     temp_datapoint.source, description="wrong source")
        return prediction,temp_datapoint

    def get_event(self, event: Eventpoint):
        for ev in self.resetcodes:
            if ev[0] == event.code and ev[1] == event.source:
                self.reset()
                return True, None, None
        return False, None, None
    def reset(self):
        self.profilebuffer = []
        self.profilebuffertimestamps = []
        self.calculated_profile = False
        self.currentReference = None
        self.model.reset()
        return

Functions

def auto_str(cls)
Expand source code
def auto_str(cls):
    def __str__(self):
        return '%s(%s)' % (
            type(self).__name__,
            ', '.join('%s=%s' % item for item in vars(self).items())
        )
    cls.__str__ = __str__
    return cls

Classes

class PairDectionHandler (source, thresholdtype='inner', thresholdfactor=2, ProfileSize=30, ActualProfileSize=1, resetcodes=[], currentReference=None, constThresholdfilter=-inf, sequencesize=1, alarmsThreshold=0, normz=False)

Techniques Handlers (Handlers)

In their initialization, they should accept their parameters, source, and reset codes. The source is a string that indicates the source of data that the Technique handler monitors. The reset codes are a list of tuples with all event codes and sources that may cause a reset in the technique (start over). The tuples in that list should be in the form of ev=(code, source) since they are tested like:

if ev[0] == event.code and ev[1] == event.source:

Interface:

  1. get_Datapoint(timestamp, vector, source, description) Feed an upcoming sample to the predictor, and a prediction is returned (when no available prediction, a PredictionPoint with None values should be returned).

Returns a tuple: (PredictionPoint, Datapoint) - the prediction and the data in the form of Datapoint.

  1. get_event(Eventpoint) Feed an upcoming event to the predictor.

Returns a Boolean, PredictionPoint, Datapoint. A boolean value is returned if the event causes a reset in the technique. Moreover, some techniques (which perform batch prediction) return the predictions and the list of Datapoints that the prediction concerns (if these are None, they are ignored).

Expand source code
@auto_str
class PairDectionHandler():
    # inner or selftunne
    '''
        Techniques Handlers (Handlers)

        In their initialization, they should accept their parameters, source, and reset codes. The source is a string that indicates the source of data that the Technique handler monitors. The reset codes are a list of tuples with all event codes and sources that may cause a reset in the technique (start over). The tuples in that list should be in the form of ev=(code, source) since they are tested like:

        if ev[0] == event.code and ev[1] == event.source:

        Interface:

        1. get_Datapoint(timestamp, vector, source, description) Feed an upcoming sample to the predictor, and a prediction is returned (when no available prediction, a PredictionPoint with None values should be returned).

        Returns a tuple: (PredictionPoint, Datapoint) - the prediction and the data in the form of Datapoint.

        2. get_event(Eventpoint) Feed an upcoming event to the predictor.

        Returns a Boolean, PredictionPoint, Datapoint. A boolean value is returned if the event causes a reset in the technique. Moreover, some techniques (which perform batch prediction) return the predictions and the list of Datapoints that the prediction concerns (if these are None, they are ignored).

        '''

    def __init__(self,source,thresholdtype="inner",thresholdfactor=2,ProfileSize=30,ActualProfileSize=1,resetcodes=[],currentReference=None,constThresholdfilter=float('-inf'),sequencesize=1,alarmsThreshold=0,normz=False):
        self.profilebuffer=[]
        self.profilebuffertimestamps=[]
        self.sequencesize=sequencesize
        self.alarmsThreshold=alarmsThreshold
        self.pointbuffer=[]
        self.currentReference=currentReference
        self.normz = normz
        self.constThresholdfilter=constThresholdfilter

        self.resetcodes = resetcodes
        self.thresholdtype = thresholdtype
        self.thresholdfactor = thresholdfactor
        self.source=source
        self.ProfileSize=ProfileSize
        if thresholdtype=="inner":
            self.ActualProfileSize = ProfileSize
        elif ActualProfileSize<1:
            self.ActualProfileSize=int(ActualProfileSize*self.ProfileSize)
        else:
            self.ActualProfileSize=ActualProfileSize

        self.model = pairdetection.PairDetection(thresholdtype, thresholdfactor,self.ActualProfileSize,constThresholdfilter=self.constThresholdfilter,alarmsThreshold=self.alarmsThreshold,normz=self.normz)


        if self.currentReference is None:
            self.calculated_profile = False
        else:
            temp_datapoint = Datapoint(self.currentReference, None, None, source)
            self.model.initilize(temp_datapoint)
            self.calculated_profile = True
    def manual_update_and_reset(self,thresholdtype=None,thresholdfactor=None):
        if thresholdtype!=None:
            self.thresholdtype = thresholdtype
        if thresholdfactor!=None:
            self.thresholdfactor = thresholdfactor
        self.model = pairdetection.PairDetection(thresholdtype, thresholdfactor,self.ActualProfileSize,constThresholdfilter=self.constThresholdfilter,actualProfileSize=self.alarmsThreshold,normz=self.normz)
    def get_current_reference(self):
        return self.currentReference

    def calculateReferenceData(self):
        if len(self.profilebuffer)>self.ProfileSize:


            data = [[seqdata.current for seqdata in seqpoint] for seqpoint in self.profilebuffer]
            tempnumpy=[]
            for sequense in data:
                tempnumpy.append(np.array([np.array(corrs) for corrs in sequense]))
            tempnumpy=np.array(tempnumpy)
            foundprofile = tempnumpy
            self.calculated_profile = True
            self.currentReference = foundprofile
    def get_Datapoint(self,timestamp : pd.Timestamp,data : ndarray, source) -> PredictionPoint:
        temp_datapoint = Datapoint(self.currentReference, data, timestamp, source)

        if source==self.source:
            self.pointbuffer.append(temp_datapoint)
            if len(self.pointbuffer) >= self.sequencesize:
                self.pointbuffer = self.pointbuffer[-self.sequencesize:]


            if len(self.pointbuffer) < self.sequencesize:
                prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                             temp_datapoint.source, description="no profile yet")

            elif self.calculated_profile==False:
                # datapoint pass reference domain profile
                self.profilebuffer.append([dd for dd in self.pointbuffer])
                self.profilebuffertimestamps.append(timestamp)
                self.calculateReferenceData()
                if self.calculated_profile:
                    temp_datapoint = Datapoint(self.currentReference, np.array([np.array(corrs) for corrs in self.pointbuffer]), timestamp, source)
                    self.model.initilize(temp_datapoint)
                prediction= PredictionPoint(None, None, None,self.thresholdtype, temp_datapoint.timestamp,temp_datapoint.source,description="no profile yet")
            else:
                temp_datapoint = Datapoint(self.currentReference,
                                           np.array([np.array(corrs.current) for corrs in self.pointbuffer]), timestamp,
                                           source)
                prediction=self.model.get_data(temp_datapoint)
                prediction.description="pair detection"
            return prediction,temp_datapoint
        prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                     temp_datapoint.source, description="wrong source")
        return prediction,temp_datapoint

    def get_event(self, event: Eventpoint):
        for ev in self.resetcodes:
            if ev[0] == event.code and ev[1] == event.source:
                self.reset()
                return True, None, None
        return False, None, None
    def reset(self):
        self.profilebuffer = []
        self.profilebuffertimestamps = []
        self.calculated_profile = False
        self.currentReference = None
        self.model.reset()
        return

Methods

def calculateReferenceData(self)
Expand source code
def calculateReferenceData(self):
    if len(self.profilebuffer)>self.ProfileSize:


        data = [[seqdata.current for seqdata in seqpoint] for seqpoint in self.profilebuffer]
        tempnumpy=[]
        for sequense in data:
            tempnumpy.append(np.array([np.array(corrs) for corrs in sequense]))
        tempnumpy=np.array(tempnumpy)
        foundprofile = tempnumpy
        self.calculated_profile = True
        self.currentReference = foundprofile
def get_Datapoint(self, timestamp: pandas._libs.tslibs.timestamps.Timestamp, data: numpy.ndarray, source) ‑> PredictionPoint
Expand source code
def get_Datapoint(self,timestamp : pd.Timestamp,data : ndarray, source) -> PredictionPoint:
    temp_datapoint = Datapoint(self.currentReference, data, timestamp, source)

    if source==self.source:
        self.pointbuffer.append(temp_datapoint)
        if len(self.pointbuffer) >= self.sequencesize:
            self.pointbuffer = self.pointbuffer[-self.sequencesize:]


        if len(self.pointbuffer) < self.sequencesize:
            prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                         temp_datapoint.source, description="no profile yet")

        elif self.calculated_profile==False:
            # datapoint pass reference domain profile
            self.profilebuffer.append([dd for dd in self.pointbuffer])
            self.profilebuffertimestamps.append(timestamp)
            self.calculateReferenceData()
            if self.calculated_profile:
                temp_datapoint = Datapoint(self.currentReference, np.array([np.array(corrs) for corrs in self.pointbuffer]), timestamp, source)
                self.model.initilize(temp_datapoint)
            prediction= PredictionPoint(None, None, None,self.thresholdtype, temp_datapoint.timestamp,temp_datapoint.source,description="no profile yet")
        else:
            temp_datapoint = Datapoint(self.currentReference,
                                       np.array([np.array(corrs.current) for corrs in self.pointbuffer]), timestamp,
                                       source)
            prediction=self.model.get_data(temp_datapoint)
            prediction.description="pair detection"
        return prediction,temp_datapoint
    prediction = PredictionPoint(None, None, None, self.thresholdtype, temp_datapoint.timestamp,
                                 temp_datapoint.source, description="wrong source")
    return prediction,temp_datapoint
def get_current_reference(self)
Expand source code
def get_current_reference(self):
    return self.currentReference
def get_event(self, event: Eventpoint)
Expand source code
def get_event(self, event: Eventpoint):
    for ev in self.resetcodes:
        if ev[0] == event.code and ev[1] == event.source:
            self.reset()
            return True, None, None
    return False, None, None
def manual_update_and_reset(self, thresholdtype=None, thresholdfactor=None)
Expand source code
def manual_update_and_reset(self,thresholdtype=None,thresholdfactor=None):
    if thresholdtype!=None:
        self.thresholdtype = thresholdtype
    if thresholdfactor!=None:
        self.thresholdfactor = thresholdfactor
    self.model = pairdetection.PairDetection(thresholdtype, thresholdfactor,self.ActualProfileSize,constThresholdfilter=self.constThresholdfilter,actualProfileSize=self.alarmsThreshold,normz=self.normz)
def reset(self)
Expand source code
def reset(self):
    self.profilebuffer = []
    self.profilebuffertimestamps = []
    self.calculated_profile = False
    self.currentReference = None
    self.model.reset()
    return