Skip to content

Commit

Permalink
periodic scan now update the scan log data if exists
Browse files Browse the repository at this point in the history
periodic scan now update the scan log data if exists
  • Loading branch information
ArulJeyananth committed Jul 20, 2022
1 parent 2a80f3a commit b36cd45
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 123 deletions.
215 changes: 114 additions & 101 deletions src/classifiers/classfication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,126 +44,139 @@ func Run() {

datasources, err := st.GetDataSources()
if err != nil {
fmt.Println("ListDatasources error ")
fmt.Println("Datasources not found ")
}
for _, datasource := range datasources {
fmt.Println(datasource)
ScanDataSource(datasource)

adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
}
}

if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
func ScanDataSource(datasource storage.DpDataSource) error {

}
log.Info().Msgf("no of classes:=%v ", len(classes))
fmt.Println("starting scan for ", datasource)
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}
adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})

if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")

}
log.Info().Msgf("no of classes:=%v ", len(classes))

//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
}
if skip == true {
continue
}
}
if skip == true {
continue
}

dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{
dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
dpDbTables = append(dpDbTables, dpDbTable)
dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
dpDbTables = append(dpDbTables, dpDbTable)

}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
}
}

}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
return nil
}

//removeSpecialChars - Removes the special characters from the string.
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func loghandler(w http.ResponseWriter, r *http.Request) {

}

var config, _ = ReadLogConfig("src/conf/datasage.yaml")
var config, _ = ReadLogConfig("./conf/datasage.yaml")

func RunServer() {

Expand Down
32 changes: 27 additions & 5 deletions src/server/grpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logger "github.com/datasage-io/datasage/src/logger"

"github.com/datasage-io/datasage/src/classifiers"
classpb "github.com/datasage-io/datasage/src/proto/class"
ds "github.com/datasage-io/datasage/src/proto/datasource"
tagpb "github.com/datasage-io/datasage/src/proto/tag"
Expand Down Expand Up @@ -43,7 +44,24 @@ type ClassServer struct {

func (d *ClassServer) AddClass(ctx context.Context, in *classpb.CreateRequest) (*classpb.MessageResponse, error) {
fmt.Println("AddClass : ", in)
return nil, nil
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
} else {
rules := in.GetTag()
log.Debug().Msgf("rules %v", rules)
for _, rule := range rules {
log.Debug().Msgf("Rule className %v", rule)

err := st.AddClass(in.GetDescription(), rule, in.GetName())
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
}
return &classpb.MessageResponse{Message: "Class Added sucessfully"}, nil
}
return &classpb.MessageResponse{Message: "Error in adding Class "}, nil

}
func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*classpb.ListResponse, error) {
fmt.Println("ListClass : ", in)
Expand All @@ -57,7 +75,6 @@ func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*
classesOut := []*classpb.ClassResponse{}
for _, class := range classes {
log.Debug().Msgf("ListTag %v", class)
//classes := []string{tag.Rule}
classOut := &classpb.ClassResponse{
Id: strconv.Itoa(class.Id),
Name: class.Rule,
Expand Down Expand Up @@ -157,11 +174,16 @@ func (d *DatasourceServer) AddDatasource(ctx context.Context, in *ds.AddRequest)
DsKey: uuid.New().String(),
}

err1 := st.AddDataSource(storageDpDataSourceObj)
if err1 != nil {
err = st.AddDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Error"}, nil
} else {
err := classifiers.ScanDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Failed to Scan Datasource "}, nil
}
}
return &ds.MessageResponse{Message: "Sucess"}, nil
return &ds.MessageResponse{Message: "Data Source added for Scaning"}, nil
}
func (d *DatasourceServer) ListDatasource(ctx context.Context, in *ds.ListRequest) (*ds.ListResponse, error) {
fmt.Println("List Datasource Request ", in)
Expand Down
Loading

0 comments on commit b36cd45

Please sign in to comment.