Skip to content

Commit

Permalink
Merge pull request #13 from datasage-io/AK-8430ArulJ
Browse files Browse the repository at this point in the history
added periodic scan
  • Loading branch information
ArulJeyananth authored Jul 18, 2022
2 parents 44b7408 + 98e8ae6 commit ae2e8f1
Show file tree
Hide file tree
Showing 8 changed files with 928 additions and 247 deletions.
8 changes: 1 addition & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rs/zerolog v1.27.0 // indirect
golang.org/x/mod v0.3.0 // indirect
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac // indirect
golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand All @@ -36,8 +32,6 @@ require (

require (
github.com/confluentinc/confluent-kafka-go v1.8.2
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
golang.org/x/text v0.3.3 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
github.com/spf13/viper v1.12.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)
718 changes: 718 additions & 0 deletions go.sum

Large diffs are not rendered by default.

292 changes: 122 additions & 170 deletions src/classifiers/classfication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package classifiers

import (
"fmt"
"log"
"regexp"
"strings"
"time"

"github.com/datasage-io/datasage/src/adaptors"
"github.com/datasage-io/datasage/src/storage"

logger "github.com/datasage-io/datasage/src/logger"
"github.com/rs/zerolog"

"github.com/spf13/viper"
)

type DpDataSource struct {
Expand All @@ -23,195 +28,142 @@ type DpDataSource struct {
Password string `json:"Password"`
}

func Run(dpDataSource DpDataSource) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("Classification Handler Run")
//Fetch MetaData
adpt, err := adaptors.New(adaptors.AdaptorConfig{
Type: dpDataSource.Dstype,
Username: dpDataSource.User,
Password: dpDataSource.Password,
Host: dpDataSource.Host})

if err != nil {
log.Fatal(err.Error())
}
info, err := adpt.Scan()

if err != nil {
log.Println(err.Error())
return
}

if err != nil {
log.Println(err.Error())
}

st, err := storage.New(storage.StorageConfig{Type: "internal", Path: "datasageD.db"})
if err != nil {
log.Fatal(err.Error())
}
//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Println(err.Error())
return
}
log.Println("no of classes:= ", len(classes))
var log *zerolog.Logger = logger.GetInstance()

//get all tags
tags, err := st.GetTags()
if err != nil {
log.Println(err.Error())
}
log.Println("no of tags:= ", len(tags))

storageDpDataSourceObj := storage.DpDataSource{
ID: -1,
Datadomain: dpDataSource.Datadomain,
Dsname: dpDataSource.Dsname,
Dsdecription: dpDataSource.Dsdecription,
Dstype: dpDataSource.Dstype,
DsKey: dpDataSource.DsKey,
Dsversion: dpDataSource.Dsversion,
Host: dpDataSource.Host,
Port: dpDataSource.Port,
User: dpDataSource.User,
Password: dpDataSource.Password,
}
func Run() {

err1 := st.SetDpDataSourceData(storageDpDataSourceObj)
if err1 != nil {
fmt.Println("SetDpDataSourceData error: ", err)
return
}
scanInterval := viper.GetInt("classifiers.dbschema-scan-interval")
ticker := time.NewTicker(time.Duration(scanInterval) * time.Minute)
for ; ; <-ticker.C {
fmt.Println("starting a scan")
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}

/*
phonetag, err := st.GetAssociatedTags("Phone Number")
datasources, err := st.GetDataSources()
if err != nil {
log.Println(err.Error())
fmt.Println("ListDatasources error ")
}
log.Println(phonetag)
*/
log.Println(" identifying class and tags start")
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}

for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Println("DB name:= ", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Println("skip DB name:= ", sc.Name)
skip = true
for _, datasource := range datasources {
fmt.Println(datasource)

adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})

if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
}
if skip == true {
continue
}

dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn
//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")

dpDbColumns := []storage.DpDbColumn{}
}
log.Info().Msgf("no of classes:=%v ", len(classes))

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err1 := removeSpecialChars(cols.ColumnName)
if err1 != nil {
log.Println(err.Error())
//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
}
if skip == true {
continue
}
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Println("Class:= ", relatedclass.Class)
//classes = classes + ";" + relatedclass.
classes = append(classes, relatedclass.Class)

dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
}
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Println("TagName:", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
dpDbTable := storage.DpDbTable{

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
dpDbTables = append(dpDbTables, dpDbTable)

}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
dpDbTables = append(dpDbTables, dpDbTable)

}
schema := storage.DpDbDatabase{
DbKey: "todo",
Name: sc.Name,
Type: "mysql",
Key: "987654321",
DpDbTables: dpDbTables,
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
}
}

log.Println(" identifying class and tags completed")
log.Println(" columsCount", columsCount)

}

func ListDatasources() ([]storage.DpDataSource, error) {
log.Println("ListDatasources")
st, err := storage.New(storage.StorageConfig{Type: "internal", Path: "datasageD.db"})
if err != nil {
log.Fatal(err.Error())
}
dataSources, err := st.GetDpDataSources()
log.Println(dataSources)
return dataSources, err

}

func DeleteDatasource(ids []int64) bool {
overallStatus := true
log.Println("DeleteDatasources ")
st, err := storage.New(storage.StorageConfig{Type: "internal", Path: "datasageD.db"})
if err == nil {
for i := range ids {
status, error := st.DeleteDpDataSources(ids[i])
if error != nil || status == false {
log.Println(error)
overallStatus = false
}
}
}
return overallStatus
}

//removeSpecialChars - Removes the special characters from the string.
Expand Down
1 change: 1 addition & 0 deletions src/conf/datasage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ integrations:
- endpoint: https://datasage.com/service/log
method: post
classifiers:
dbschema-scan-interval: 10
ignore_schema: mysql,performance_schema,datadefender


Expand Down
Loading

0 comments on commit ae2e8f1

Please sign in to comment.