Skip to content

Commit

Permalink
added spark anomaly detection support for standalone and container ba…
Browse files Browse the repository at this point in the history
…sed deployments
  • Loading branch information
igabriel85 committed Oct 2, 2017
1 parent 389597f commit 842160a
Showing 1 changed file with 91 additions and 15 deletions.
106 changes: 91 additions & 15 deletions adpengine/dmonadpengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
from dmonscikit import dmonscilearncluster as sdmon
from dmonscikit import dmonscilearnclassification as cdmon
import subprocess


class AdpEngine:
Expand Down Expand Up @@ -48,7 +49,7 @@ def __init__(self, settingsDict, dataDir, modelsDir, queryDir):
self.anomalyIndex = "anomalies"
self.regnodeList = []
self.allowedMethodsClustering = ['skm', 'em', 'dbscan', 'sdbscan', 'isoforest']
self.allowefMethodsClassification = ['randomforest', 'decisiontree', 'sneural', 'adaboost', 'naivebayes'] # TODO
self.allowefMethodsClassification = ['randomforest', 'decisiontree', 'sneural', 'adaboost', 'naivebayes', 'rbad'] # TODO
self.heap = settingsDict['heap']
self.dmonConnector = Connector(self.esendpoint, dmonPort=self.dmonPort, index=self.index)
self.qConstructor = QueryConstructor(self.queryDir)
Expand Down Expand Up @@ -208,6 +209,7 @@ def getData(self, detect=False):
print "System Metrics merge complete"
else:
print "Only for all system metrics available" #todo for metrics types
sys.exit()
if 'yarn' in queryd:
print "Starting query for yarn metrics"
if queryd['yarn'] == 0: # todo test if it works
Expand Down Expand Up @@ -455,10 +457,10 @@ def getData(self, detect=False):
print "Finished query and merge for yarn metrics"

elif 'spark' in queryd:
print "Spark metrics" #todo
self.sparkReturn = 0
print "Starting query for Spark metrics" #todo
self.sparkReturn = self.getSpark(detect=detect)
elif 'storm' in queryd:
print "Starting query for storm metrics"
print "Starting query for Storm metrics"
stormTopology = self.dmonConnector.getStormTopology()
try:
bolts = stormTopology['bolts']
Expand Down Expand Up @@ -567,7 +569,7 @@ def trainMethod(self):
elif 'mongodb' in queryd:
udata = self.dformat.toDF(os.path.join(self.dataDir, 'Merged_Mongo.csv'))
elif 'spark' in queryd:
return "not yet implemented" # todo
udata = self.dformat.toDF(os.path.join(self.dataDir, 'Spark.csv'))
elif 'userquery' in queryd:
udata = self.dformat.toDF(os.path.join(self.dataDir, 'query_response.csv'))
elif 'cep' in queryd:
Expand All @@ -584,7 +586,7 @@ def trainMethod(self):
elif 'mongodb' in queryd:
udata = mongoReturn
elif 'spark' in queryd:
return "not yet implemented" # todo
udata = sparkReturn
elif 'userquery' in queryd:
udata = userqueryReturn
elif 'cep' in queryd:
Expand Down Expand Up @@ -706,15 +708,45 @@ def trainMethod(self):
logger.info('[%s] : [INFO] Initializaing RandomForest model creation ....',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
rfmodel = classdmon.randomForest(settings=self.methodSettings, data=udata, dropna=True)

elif self.method == 'decisiontree':
print "dt"
logger.info('[%s] : [INFO] Initializaing Decision Tree model creation ....',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
dtmodel = classdmon.decisionTree(settings=self.methodSettings, data=udata, dropna=True)
elif self.method == 'sneural':
print 'sneural'
logger.info('[%s] : [INFO] Initializaing Neural Network model creation ....',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
nnmodel = classdmon.neuralNet(settings=self.methodSettings, data=udata, dropna=True)
elif self.method == 'adaboost':
print 'addaboost'
logger.info('[%s] : [INFO] Initializaing Ada Boost model creation ....',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
admodel = classdmon.adaBoost(settings=self.methodSettings, data=udata, dropna=True)
elif self.method == 'naivebayes':
print 'naivebayes'
print 'NaiveBayes not available in this version!'
logger.warning('[%s] : [WARN] NaiveBayes not available in this version!',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
sys.exit(0)
elif self.method == 'rbad':
rbad_home = os.environ['RBAD_HOME'] = os.getenv('RBAD_HOME', os.getcwd())
rbad_exec = os.path.join(rbad_home, 'RBAD')

if os.path.isfile(rbad_exec):
logger.error('[%s] : [ERROR] RBAD Executable nor found at %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), rbad_exec)
sys.exit(1)
rbadPID = 0
try:
rbadPID = subprocess.Popen(rbad_exec, stdout=subprocess.PIPE,
close_fds=True).pid
except Exception as inst:
logger.error("[%s] : [ERROR] Cannot start RBAD with %s and %s",
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'),
type(inst), inst.args)
sys.exit(1)

print "RBAD finished"
logger.info('[%s] : [WARN] RBAD finished!',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
sys.exit(0)
self.train = False
else:
logger.error('[%s] : [ERROR] Unknown method %s of type %s ',
Expand Down Expand Up @@ -870,6 +902,14 @@ def detectAnomalies(self):
dataf = os.path.join(self.dataDir, 'CEP.csv')
data = self.dformat.toDF(dataf)
data = self.filterData(data)
elif 'spark' in queryd:
sparkReturn = self.filterData(sparkReturn)
if checkpoint:
data = sparkReturn
else:
dataf = os.path.join(self.dataDir, 'Spark.csv')
data = self.dformat.toDF(dataf)
data = self.filterData(data)
if self.method in self.allowedMethodsClustering:
print "Detecting with selected method %s of type %s" % (self.method, self.type)
if os.path.isfile(os.path.join(self.modelsDir, self.modelName(self.method, self.load))):
Expand Down Expand Up @@ -945,6 +985,14 @@ def detectAnomalies(self):
data = self.dformat.toDF(dataf)
data.set_index('key', inplace=True)
data = self.filterData(data)
elif 'spark' in queryd:
if checkpoint:
data = sparkReturn
else:
dataf = os.path.joint(self.dataDir, 'Spark.csv')
data = self.dformat.toDF(dataf)
data.set_index('key', inplace=True)
data = self.filterData(data)
if self.method in self.allowefMethodsClassification:
print "Detecting with selected method %s of type %s" % (self.method, self.type)
if os.path.isfile(os.path.join(self.modelsDir, self.modelName(self.method, self.load))):
Expand Down Expand Up @@ -1010,19 +1058,19 @@ def run(self, engine):
def runProcess(self, engine):
proc = []
try:
# pPoint = AdpPointProcess(engine, 'Point Proc')
pPoint = AdpPointProcess(engine, 'Point Proc')
pTrain = AdpTrainProcess(engine, 'Train Proc')
pDetect = AdpDetectProcess(engine, 'Detect Proc')

# processPoint = pPoint.run()
# proc.append(processPoint)
processPoint = pPoint.run()
proc.append(processPoint)
processTrain = pTrain.run()
proc.append(processTrain)
processDetect = pDetect.run()
proc.append(processDetect)


# processPoint.start()
processPoint.start()
processTrain.start()
processDetect.start()

Expand Down Expand Up @@ -1638,6 +1686,34 @@ def getCEP(self, detect=False):

return returnCEP

def getSpark(self, detect=False):
if detect:
tfrom = "now-%s" % self.interval
to = "now"
else:
tfrom = self.tfrom
to = self.to
print "Querying Spark metrics ..."
logger.info('[%s] : [INFO] Querying Spark metrics ...',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
checkpoint = str2Bool(self.checkpoint)

sparkString, spark_file = self.qConstructor.sparkString()
qSpark = self.qConstructor.sparkQuery(sparkString, tfrom, to, self.qsize, self.interval)
gSpark = self.dmonConnector.aggQuery(qSpark)

if not checkpoint:
self.dformat.dict2csv(gSpark, qSpark, spark_file)
returnSP = 0
else:
df_SP = self.dformat.dict2csv(gSpark, qSpark, spark_file, df=checkpoint)
# df_NN.set_index('key', inplace=True)
returnNN = df_SP
print "Querying Spark metrics complete"
logger.info('[%s] : [INFO] Querying Name Node metrics complete',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))
return returnNN

def printTest(self):
print "Endpoint -> %s" %self.esendpoint
print "Method settings -> %s" %self.methodSettings
Expand Down

0 comments on commit 842160a

Please sign in to comment.