Skip to content

Commit

Permalink
Merge pull request #26 from ArulJeyananth/main
Browse files Browse the repository at this point in the history
periodic scan now update the scan log data if exists
  • Loading branch information
pshussain authored Jul 20, 2022
2 parents b279043 + 1e0c7ed commit 4e3a56f
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 122 deletions.
215 changes: 114 additions & 101 deletions src/classifiers/classfication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,126 +44,139 @@ func Run() {

datasources, err := st.GetDataSources()
if err != nil {
fmt.Println("ListDatasources error ")
fmt.Println("Datasources not found ")
}
for _, datasource := range datasources {
fmt.Println(datasource)
ScanDataSource(datasource)

adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
}
}

if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
func ScanDataSource(datasource storage.DpDataSource) error {

}
log.Info().Msgf("no of classes:=%v ", len(classes))
fmt.Println("starting scan for ", datasource)
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}
adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})

if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")

}
log.Info().Msgf("no of classes:=%v ", len(classes))

//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
}
if skip == true {
continue
}
}
if skip == true {
continue
}

dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{
dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
dpDbTables = append(dpDbTables, dpDbTable)
dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
dpDbTables = append(dpDbTables, dpDbTable)

}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
}
}

}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
return nil
}

//removeSpecialChars - Removes the special characters from the string.
Expand Down
1 change: 1 addition & 0 deletions src/integrations/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func loghandler(w http.ResponseWriter, r *http.Request) {

}


var config, _ = ReadLogConfig("/etc/datasage/conf/datasage.yaml")

func RunServer() {
Expand Down
32 changes: 27 additions & 5 deletions src/server/grpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logger "github.com/datasage-io/datasage/src/logger"

"github.com/datasage-io/datasage/src/classifiers"
classpb "github.com/datasage-io/datasage/src/proto/class"
ds "github.com/datasage-io/datasage/src/proto/datasource"
tagpb "github.com/datasage-io/datasage/src/proto/tag"
Expand Down Expand Up @@ -43,7 +44,24 @@ type ClassServer struct {

func (d *ClassServer) AddClass(ctx context.Context, in *classpb.CreateRequest) (*classpb.MessageResponse, error) {
fmt.Println("AddClass : ", in)
return nil, nil
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
} else {
rules := in.GetTag()
log.Debug().Msgf("rules %v", rules)
for _, rule := range rules {
log.Debug().Msgf("Rule className %v", rule)

err := st.AddClass(in.GetDescription(), rule, in.GetName())
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
}
return &classpb.MessageResponse{Message: "Class Added sucessfully"}, nil
}
return &classpb.MessageResponse{Message: "Error in adding Class "}, nil

}
func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*classpb.ListResponse, error) {
fmt.Println("ListClass : ", in)
Expand All @@ -57,7 +75,6 @@ func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*
classesOut := []*classpb.ClassResponse{}
for _, class := range classes {
log.Debug().Msgf("ListTag %v", class)
//classes := []string{tag.Rule}
classOut := &classpb.ClassResponse{
Id: strconv.Itoa(class.Id),
Name: class.Rule,
Expand Down Expand Up @@ -157,11 +174,16 @@ func (d *DatasourceServer) AddDatasource(ctx context.Context, in *ds.AddRequest)
DsKey: uuid.New().String(),
}

err1 := st.AddDataSource(storageDpDataSourceObj)
if err1 != nil {
err = st.AddDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Error"}, nil
} else {
err := classifiers.ScanDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Failed to Scan Datasource "}, nil
}
}
return &ds.MessageResponse{Message: "Sucess"}, nil
return &ds.MessageResponse{Message: "Data Source added for Scaning"}, nil
}
func (d *DatasourceServer) ListDatasource(ctx context.Context, in *ds.ListRequest) (*ds.ListResponse, error) {
fmt.Println("List Datasource Request ", in)
Expand Down
Loading

0 comments on commit 4e3a56f

Please sign in to comment.