Skip to content

Commit

Permalink
sync with dev repository
Browse files Browse the repository at this point in the history
  • Loading branch information
igabriel85 committed Nov 21, 2016
1 parent 3bc81c2 commit e4b5878
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 95 deletions.
174 changes: 125 additions & 49 deletions adpengine/dmonadpengine.py

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions adplogger.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import logging
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import RotatingFileHandler
import os
import datetime

logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.ERROR)
logger = logging.getLogger("ADP Log")
logger.setLevel(logging.WARN)

loggerESt = logging.getLogger('elasticsearch.trace')
loggerESt.setLevel(logging.WARN)
loggerES = logging.getLogger('elasticsearch')
loggerES.setLevel(logging.WARN)
loggerurl3 = logging.getLogger("urllib3")
loggerurl3.setLevel(logging.WARN)


# add a rotating handler
logFile = os.path.join(os.path.dirname(os.path.abspath('')), 'dmonadp.log')
handler = TimedRotatingFileHandler(logFile, when="d", interval=1, backupCount=5)
logFile = os.path.join('dmonadp.log')
handler = RotatingFileHandler(logFile, maxBytes=100000000, backupCount=5)
logger.addHandler(handler)
loggerESt.addHandler(handler)
loggerES.addHandler(handler)
loggerurl3.addHandler(handler)

15 changes: 9 additions & 6 deletions dataformatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DataFormatter:

def __init__(self, dataDir):
self.dataDir = dataDir
self.fmHead = 0

def getJson(self):
return 'load Json'
Expand Down Expand Up @@ -100,7 +101,7 @@ def dropColumns(self, df, lColumns, cp=True):
return 0

def fillMissing(self, df):
df.fillna(0)
df.fillna(0, inplace=True)

def dropMissing(self, df):
df.dropna(axis=1, how='all', inplace=True)
Expand Down Expand Up @@ -204,17 +205,17 @@ def chainMergeDFS(self, dfs=None, dfsfs=None, fsop=None):
lFiles = [dfs, dfsfs, fsop]
return self.listMerge(lFiles)

def chainMergeCluster(self, clusterMetrics=None, queue=None, jvmRM=None, jvmmrapp=None):
def chainMergeCluster(self, clusterMetrics=None, queue=None, jvmRM=None):
'''
:return: -> merged cluster metrics
'''
if clusterMetrics is None and queue is None and jvmRM is None and jvmmrapp is None:
if clusterMetrics is None and queue is None and jvmRM is None:
clusterMetrics = os.path.join(self.dataDir, "ClusterMetrics.csv")
queue = os.path.join(self.dataDir, "ResourceManagerQueue.csv")
jvmRM = os.path.join(self.dataDir, "JVM_RM.csv")
jvmmrapp = os.path.join(self.dataDir, "JVM_MRAPP.csv")
# jvmmrapp = os.path.join(self.dataDir, "JVM_MRAPP.csv")

lFiles = [clusterMetrics, queue, jvmRM, jvmmrapp]
lFiles = [clusterMetrics, queue, jvmRM]

return self.listMerge(lFiles)

Expand Down Expand Up @@ -395,7 +396,9 @@ def mergeFinal(self, dfs=None, cluster=None, nodeMng=None, jvmnodeMng=None, data
merged_df = self.listMerge(lFile)
merged_df.sort_index(axis=1, inplace=True)
# merged_df.set_index('key', inplace=True)
self.dropMissing(merged_df)
#self.dropMissing(merged_df)
self.fillMissing(merged_df)
self.fmHead = list(merged_df.columns.values)
return merged_df

def dict2csv(self, response, query, filename, df=False):
Expand Down
23 changes: 9 additions & 14 deletions dmonadp.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
ESEndpoint:85.120.206.27
ESPort:9200
DMonPort:5001
From:1477561800000
To:1477562100000
From:1479105362284
To:1479119769978
Query:yarn:cluster, nn, nm, dfs, dn, mr;system
Nodes:
QSize:0
QInterval:10s

[Mode]
Training:True
Training:false
Validate:False
Detect:True
Detect:false

[Filter]
Columns:
Rows:
DColumns:
#Columns:colname;colname2;colname3
#Rows:ld:145607979;gd:145607979
#DColumns:colname;colname2;colname3


[Detect]
Expand All @@ -40,10 +40,5 @@ Network: tx:gd:34344;rx:ld:323434
heap:512m
checkpoint:false
delay:2m
interval:30m






interval:15m
resetindex:false
19 changes: 15 additions & 4 deletions dmonadp.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def main(argv):
"heap": None,
"checkpoint": None,
"delay": None,
"interval": None
"interval": None,
"resetindex": None
}

# Only for testing
Expand Down Expand Up @@ -347,7 +348,7 @@ def main(argv):

try:
print "Settings for method %s: " % settings['method']
settings['MethodSettings'] = {}
settings['MethodSettings'] = {} #todo read settings from commandline ?
for name, value in readCnf['MethodSettings'].iteritems():
print "%s -> %s" % (name, value)
settings['MethodSettings'][name] = value
Expand Down Expand Up @@ -482,7 +483,17 @@ def main(argv):
logger.info('[%s] : [INFO] Interval is set to %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), settings['interval'])


if settings["resetindex"] is None:
try:
print "Reset index set to %s" % readCnf['Misc']['resetindex']
settings["resetindex"] = readCnf['Misc']['resetindex']
except:
print "Reset index not set, skipping."
settings["resetindex"] = False
else:
print "Reset index set to %s" % settings["resetindex"]
logger.info('[%s] : [INFO] Reset index set to %s"',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), settings['resetindex'])

#if settings["esendpoint"] == None:

Expand Down Expand Up @@ -530,7 +541,7 @@ def main(argv):
# engine.trainMethod()
# engine.detectAnomalies(30)
# engine.printTest()

print "\n"
print "#" * 100


Expand Down
2 changes: 1 addition & 1 deletion dmonconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def closeIndex(self, indexName):

def deleteIndex(self, indexName):
try:
res = self.esInstance.delete(index=indexName, ignore=[400, 404])
res = self.esInstance.indices.delete(index=indexName, ignore=[400, 404])
logger.info('[%s] : [INFO] Deleted index %s',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'), indexName)
except Exception as inst:
Expand Down
29 changes: 20 additions & 9 deletions dmonweka/dweka.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ def loadClusterModel(self, method, mname):
def runclustermodel(self, model, method, dataf, temp=True):
anomalies = []
try:
jvm.start()
jvm.start(max_heap_size=self.wHeap)
data = self.loadData(dataf, temp)
cluster = self.loadClusterModel(model, method)
clusterMembership = []
print cluster.number_of_clusters
for inst in data:
cl = cluster.cluster_instance(inst)
try:
cl = cluster.cluster_instance(inst)
except Exception as inst:
logger.error('[%s] : [ERROR] Mismatch model and data attributes',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'))

dist = cluster.distribution_for_instance(inst)
print ("cluster=" + str(cl) + ", distribution=" + str(dist))
clusterMembership.append(cl)
Expand All @@ -54,10 +60,15 @@ def runclustermodel(self, model, method, dataf, temp=True):
# print data.get_instance(3)

pa = self.calcThreashold(dict(Counter(clusterMembership)), 21)
for a in pa:
# print data.get_instance(a).get_value(0) #todo always set key as first atribute
anomalies.append(data.get_instance(a).get_value(0))
print "Detected using %s anomalies at timestamp(s) %s" % (model, str(anomalies))
if pa == 0:
logger.warning('[%s] : [WARN] Most instances are computed as anomalies, possible error encountered!',
datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S'),)
print "Most instances are computed as anomalies, possible error encountered!"
else:
for a in pa:
# print data.get_instance(a).get_value(0) #todo always set key as first atribute
anomalies.append(data.get_instance(a).get_value(0))
print "Detected using %s anomalies at timestamp(s) %s" % (model, str(anomalies))
except Exception, e:
print(traceback.format_exc())
finally:
Expand All @@ -77,7 +88,7 @@ def simpleKMeansTrain(self, dataf, options, mname, temp=True):
:return:
'''
try:
jvm.start()
jvm.start(max_heap_size=self.wHeap)
data = self.loadData(dataf, temp=True)
clusterer = Clusterer(classname="weka.clusterers.SimpleKMeans", options=options)
clusterer.build_clusterer(data)
Expand Down Expand Up @@ -106,7 +117,7 @@ def dbscanTrain(self, dataf, options, mname, temp=True):
'''

try:
jvm.start()
jvm.start(max_heap_size=self.wHeap)
data = self.loadData(dataf, temp)
clusterDBSCAN = Clusterer(classname="weka.clusterers.DBSCAN", options=options)
clusterDBSCAN.build_clusterer(data)
Expand All @@ -132,7 +143,7 @@ def emTrain(self, dataf, options, mname, temp=True):
:return:
'''
try:
jvm.start()
jvm.start(max_heap_size=self.wHeap)
data = self.loadData(dataf, temp)
clusterEM = Clusterer(classname="weka.clusterers.EM",
options=options)
Expand Down
2 changes: 1 addition & 1 deletion pyQueryConstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def jvmRedProcessString(self, host):

def jvmMapProcessingString(self, host):
qstring = "type:\"maptask-metrics\" AND serviceType:\"jvm\" AND hostname:\"%s\"" % host
return qstring
return qstring #TODO

def jvmRedProcessbyNameString(self, host, process):
qstring = "type:\"reducetask-metrics\" AND serviceType:\"jvm\" AND hostname:\"%s\" AND ProcessName:\"%s\"" %(host, process)
Expand Down
2 changes: 1 addition & 1 deletion pyadp.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@
pass

# Get non host based metrics queries and file strings
dfs, dfs_file = qConstructor.dfsFString()
dfs, dfs_file = qConstructor.dfsString()
dfsFs, dfsFs_file = qConstructor.dfsFString()
jvmNameNodeString, jvmNameNode_file = qConstructor.jvmNameNodeString()
queue, queue_file = qConstructor.queueResourceString()
Expand Down
57 changes: 57 additions & 0 deletions test/adpPointTest.py

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,15 @@ def parseMethodSettings(st):
return mSettings


def wait4Model(count):
if count < 10:
def wait4Model(count=0):
test = False
if test or count < 10:
time.sleep(1)
count += 1
print count
return wait4Model(count)
print "Done"
return 1

# wait4Model(0)
# wait4Model()

# test = {'s': '10', 'n': '10'}
# print parseMethodSettings(test)
Expand Down

0 comments on commit e4b5878

Please sign in to comment.