Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

periodic scan now update the scan log data if exists #26

Merged
merged 2 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 114 additions & 101 deletions src/classifiers/classfication_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,126 +44,139 @@ func Run() {

datasources, err := st.GetDataSources()
if err != nil {
fmt.Println("ListDatasources error ")
fmt.Println("Datasources not found ")
}
for _, datasource := range datasources {
fmt.Println(datasource)
ScanDataSource(datasource)

adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
}
}

if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
func ScanDataSource(datasource storage.DpDataSource) error {

}
log.Info().Msgf("no of classes:=%v ", len(classes))
fmt.Println("starting scan for ", datasource)
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}
adaptor, err := adaptors.New(adaptors.AdaptorConfig{
Type: datasource.Dstype,
Username: datasource.User,
Password: datasource.Password,
Host: datasource.Host,
})

if err != nil {
log.Error().Err(err).Msg("Internal Error")
return err
}

//get all classes
classes, err := st.GetClasses()
if err != nil {
log.Error().Err(err).Msg("Internal Error")

}
log.Info().Msgf("no of classes:=%v ", len(classes))

//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
//get all tags
tags, err := st.GetTags()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
log.Info().Msgf("no of tags:=%v ", len(tags))
columsCount := 0
tableCount := 0
dBCount := 0
skipDBs := []string{"mysql", "performance_schema", "datadefender"}
info, err := adaptor.Scan()
for _, sc := range info.Schemas {
dBCount = dBCount + 1
log.Info().Msgf("DB name:= %v", sc.Name)
skip := false
for _, skipDB := range skipDBs {

if skipDB == sc.Name {
log.Info().Msgf("skip DB name:= ", sc.Name)
skip = true
}
}
if skip == true {
continue
}
}
if skip == true {
continue
}

dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{
dpDbTables := []storage.DpDbTable{}
for _, tb := range sc.Tables {
tableCount = tableCount + 1
//dpDbColumn := storage.DpDbColumn

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
dpDbTables = append(dpDbTables, dpDbTable)
dpDbColumns := []storage.DpDbColumn{}

for _, cols := range tb.Cols {
columsCount = columsCount + 1
colName, err := removeSpecialChars(cols.ColumnName)
if err != nil {
log.Error().Err(err).Msg("Internal Error")
continue
}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
relatedtags, _ := st.GetAssociatedTags(colName)
relatedclasses, _ := st.GetAssociatedClasses(colName)

//if err != nil {
// log.Println(err.Error())
//}
tags := []string{}
classes := []string{}
if len(relatedclasses) > 0 {
for _, relatedclass := range relatedclasses {
log.Info().Msgf("Class:= %v", relatedclass.Class)
classes = append(classes, relatedclass.Class)
}
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
if len(relatedtags) > 0 {
for _, relatedtag := range relatedtags {
log.Info().Msgf("TagName:= %v", relatedtag.TagName)
//tags = tags + ";" + relatedtag.TagName
tags = append(tags, relatedtag.TagName)
}
}
col := storage.DpDbColumn{
ColumnName: colName,
ColumnType: cols.ColumnType,
ColumnComment: cols.ColumnComment,
Tags: strings.Join(tags, ";"),
Classes: strings.Join(classes, ";"),
}
dpDbColumns = append(dpDbColumns, col)
}
dpDbTable := storage.DpDbTable{

Name: tb.Name,
Tags: "",
DpDbColumns: dpDbColumns,
}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
dpDbTables = append(dpDbTables, dpDbTable)

}
schema := storage.DpDbDatabase{
DsKey: datasource.DsKey,
Name: sc.Name,
Type: datasource.Dstype,
DpDbTables: dpDbTables,
}
err := st.SetSchemaData(schema)
if err != nil {
fmt.Println(err)
}
}

}
log.Info().Msgf("scan completed for datasource: %v", datasource.Host)
return nil
}

//removeSpecialChars - Removes the special characters from the string.
Expand Down
1 change: 1 addition & 0 deletions src/integrations/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func loghandler(w http.ResponseWriter, r *http.Request) {

}


var config, _ = ReadLogConfig("/etc/datasage/conf/datasage.yaml")

func RunServer() {
Expand Down
32 changes: 27 additions & 5 deletions src/server/grpcServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

logger "github.com/datasage-io/datasage/src/logger"

"github.com/datasage-io/datasage/src/classifiers"
classpb "github.com/datasage-io/datasage/src/proto/class"
ds "github.com/datasage-io/datasage/src/proto/datasource"
tagpb "github.com/datasage-io/datasage/src/proto/tag"
Expand Down Expand Up @@ -43,7 +44,24 @@ type ClassServer struct {

func (d *ClassServer) AddClass(ctx context.Context, in *classpb.CreateRequest) (*classpb.MessageResponse, error) {
fmt.Println("AddClass : ", in)
return nil, nil
st, err := storage.GetStorageInstance()
if err != nil {
log.Error().Err(err).Msg("Internal Error")
} else {
rules := in.GetTag()
log.Debug().Msgf("rules %v", rules)
for _, rule := range rules {
log.Debug().Msgf("Rule className %v", rule)

err := st.AddClass(in.GetDescription(), rule, in.GetName())
if err != nil {
log.Error().Err(err).Msg("Internal Error")
}
}
return &classpb.MessageResponse{Message: "Class Added sucessfully"}, nil
}
return &classpb.MessageResponse{Message: "Error in adding Class "}, nil

}
func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*classpb.ListResponse, error) {
fmt.Println("ListClass : ", in)
Expand All @@ -57,7 +75,6 @@ func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*
classesOut := []*classpb.ClassResponse{}
for _, class := range classes {
log.Debug().Msgf("ListTag %v", class)
//classes := []string{tag.Rule}
classOut := &classpb.ClassResponse{
Id: strconv.Itoa(class.Id),
Name: class.Rule,
Expand Down Expand Up @@ -157,11 +174,16 @@ func (d *DatasourceServer) AddDatasource(ctx context.Context, in *ds.AddRequest)
DsKey: uuid.New().String(),
}

err1 := st.AddDataSource(storageDpDataSourceObj)
if err1 != nil {
err = st.AddDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Error"}, nil
} else {
err := classifiers.ScanDataSource(storageDpDataSourceObj)
if err != nil {
return &ds.MessageResponse{Message: "Failed to Scan Datasource "}, nil
}
}
return &ds.MessageResponse{Message: "Sucess"}, nil
return &ds.MessageResponse{Message: "Data Source added for Scaning"}, nil
}
func (d *DatasourceServer) ListDatasource(ctx context.Context, in *ds.ListRequest) (*ds.ListResponse, error) {
fmt.Println("List Datasource Request ", in)
Expand Down
Loading