diff --git a/src/classifiers/classfication_handler.go b/src/classifiers/classfication_handler.go index 2042753..74178fa 100644 --- a/src/classifiers/classfication_handler.go +++ b/src/classifiers/classfication_handler.go @@ -44,126 +44,139 @@ func Run() { datasources, err := st.GetDataSources() if err != nil { - fmt.Println("ListDatasources error ") + fmt.Println("Datasources not found ") } for _, datasource := range datasources { fmt.Println(datasource) + ScanDataSource(datasource) - adaptor, err := adaptors.New(adaptors.AdaptorConfig{ - Type: datasource.Dstype, - Username: datasource.User, - Password: datasource.Password, - Host: datasource.Host, - }) + log.Info().Msgf("scan completed for datasource: %v", datasource.Host) + } + } - if err != nil { - log.Error().Err(err).Msg("Internal Error") - continue - } +} - //get all classes - classes, err := st.GetClasses() - if err != nil { - log.Error().Err(err).Msg("Internal Error") +func ScanDataSource(datasource storage.DpDataSource) error { - } - log.Info().Msgf("no of classes:=%v ", len(classes)) + fmt.Println("starting scan for ", datasource) + st, err := storage.GetStorageInstance() + if err != nil { + log.Error().Err(err).Msg("Internal Error") + return err + } + adaptor, err := adaptors.New(adaptors.AdaptorConfig{ + Type: datasource.Dstype, + Username: datasource.User, + Password: datasource.Password, + Host: datasource.Host, + }) + + if err != nil { + log.Error().Err(err).Msg("Internal Error") + return err + } + + //get all classes + classes, err := st.GetClasses() + if err != nil { + log.Error().Err(err).Msg("Internal Error") + + } + log.Info().Msgf("no of classes:=%v ", len(classes)) - //get all tags - tags, err := st.GetTags() - if err != nil { - log.Error().Err(err).Msg("Internal Error") + //get all tags + tags, err := st.GetTags() + if err != nil { + log.Error().Err(err).Msg("Internal Error") + } + log.Info().Msgf("no of tags:=%v ", len(tags)) + columsCount := 0 + tableCount := 0 + dBCount := 0 + skipDBs := []string{"mysql", "performance_schema", "datadefender"} + info, err := adaptor.Scan() + for _, sc := range info.Schemas { + dBCount = dBCount + 1 + log.Info().Msgf("DB name:= %v", sc.Name) + skip := false + for _, skipDB := range skipDBs { + + if skipDB == sc.Name { + log.Info().Msgf("skip DB name:= ", sc.Name) + skip = true } - log.Info().Msgf("no of tags:=%v ", len(tags)) - columsCount := 0 - tableCount := 0 - dBCount := 0 - skipDBs := []string{"mysql", "performance_schema", "datadefender"} - info, err := adaptor.Scan() - for _, sc := range info.Schemas { - dBCount = dBCount + 1 - log.Info().Msgf("DB name:= %v", sc.Name) - skip := false - for _, skipDB := range skipDBs { - - if skipDB == sc.Name { - log.Info().Msgf("skip DB name:= ", sc.Name) - skip = true - } - } - if skip == true { - continue - } + } + if skip == true { + continue + } - dpDbTables := []storage.DpDbTable{} - for _, tb := range sc.Tables { - tableCount = tableCount + 1 - //dpDbColumn := storage.DpDbColumn - - dpDbColumns := []storage.DpDbColumn{} - - for _, cols := range tb.Cols { - columsCount = columsCount + 1 - colName, err := removeSpecialChars(cols.ColumnName) - if err != nil { - log.Error().Err(err).Msg("Internal Error") - continue - } - relatedtags, _ := st.GetAssociatedTags(colName) - relatedclasses, _ := st.GetAssociatedClasses(colName) - - //if err != nil { - // log.Println(err.Error()) - //} - tags := []string{} - classes := []string{} - if len(relatedclasses) > 0 { - for _, relatedclass := range relatedclasses { - log.Info().Msgf("Class:= %v", relatedclass.Class) - classes = append(classes, relatedclass.Class) - } - } - if len(relatedtags) > 0 { - for _, relatedtag := range relatedtags { - log.Info().Msgf("TagName:= %v", relatedtag.TagName) - //tags = tags + ";" + relatedtag.TagName - tags = append(tags, relatedtag.TagName) - } - } - col := storage.DpDbColumn{ - ColumnName: colName, - ColumnType: cols.ColumnType, - ColumnComment: cols.ColumnComment, - Tags: strings.Join(tags, ";"), - Classes: strings.Join(classes, ";"), - } - dpDbColumns = append(dpDbColumns, col) - } - dpDbTable := storage.DpDbTable{ + dpDbTables := []storage.DpDbTable{} + for _, tb := range sc.Tables { + tableCount = tableCount + 1 + //dpDbColumn := storage.DpDbColumn - Name: tb.Name, - Tags: "", - DpDbColumns: dpDbColumns, - } - dpDbTables = append(dpDbTables, dpDbTable) + dpDbColumns := []storage.DpDbColumn{} + for _, cols := range tb.Cols { + columsCount = columsCount + 1 + colName, err := removeSpecialChars(cols.ColumnName) + if err != nil { + log.Error().Err(err).Msg("Internal Error") + continue } - schema := storage.DpDbDatabase{ - DsKey: datasource.DsKey, - Name: sc.Name, - Type: datasource.Dstype, - DpDbTables: dpDbTables, + relatedtags, _ := st.GetAssociatedTags(colName) + relatedclasses, _ := st.GetAssociatedClasses(colName) + + //if err != nil { + // log.Println(err.Error()) + //} + tags := []string{} + classes := []string{} + if len(relatedclasses) > 0 { + for _, relatedclass := range relatedclasses { + log.Info().Msgf("Class:= %v", relatedclass.Class) + classes = append(classes, relatedclass.Class) + } } - err := st.SetSchemaData(schema) - if err != nil { - fmt.Println(err) + if len(relatedtags) > 0 { + for _, relatedtag := range relatedtags { + log.Info().Msgf("TagName:= %v", relatedtag.TagName) + //tags = tags + ";" + relatedtag.TagName + tags = append(tags, relatedtag.TagName) + } + } + col := storage.DpDbColumn{ + ColumnName: colName, + ColumnType: cols.ColumnType, + ColumnComment: cols.ColumnComment, + Tags: strings.Join(tags, ";"), + Classes: strings.Join(classes, ";"), } + dpDbColumns = append(dpDbColumns, col) + } + dpDbTable := storage.DpDbTable{ + Name: tb.Name, + Tags: "", + DpDbColumns: dpDbColumns, } - log.Info().Msgf("scan completed for datasource: %v", datasource.Host) + dpDbTables = append(dpDbTables, dpDbTable) + + } + schema := storage.DpDbDatabase{ + DsKey: datasource.DsKey, + Name: sc.Name, + Type: datasource.Dstype, + DpDbTables: dpDbTables, + } + err := st.SetSchemaData(schema) + if err != nil { + fmt.Println(err) } - } + } + log.Info().Msgf("scan completed for datasource: %v", datasource.Host) + return nil } //removeSpecialChars - Removes the special characters from the string. diff --git a/src/integrations/http_server.go b/src/integrations/http_server.go index 8a131ab..fdb3166 100644 --- a/src/integrations/http_server.go +++ b/src/integrations/http_server.go @@ -32,6 +32,7 @@ func loghandler(w http.ResponseWriter, r *http.Request) { } + var config, _ = ReadLogConfig("/etc/datasage/conf/datasage.yaml") func RunServer() { diff --git a/src/server/grpcServer.go b/src/server/grpcServer.go index f8d0501..c502ca8 100644 --- a/src/server/grpcServer.go +++ b/src/server/grpcServer.go @@ -9,6 +9,7 @@ import ( logger "github.com/datasage-io/datasage/src/logger" + "github.com/datasage-io/datasage/src/classifiers" classpb "github.com/datasage-io/datasage/src/proto/class" ds "github.com/datasage-io/datasage/src/proto/datasource" tagpb "github.com/datasage-io/datasage/src/proto/tag" @@ -43,7 +44,24 @@ type ClassServer struct { func (d *ClassServer) AddClass(ctx context.Context, in *classpb.CreateRequest) (*classpb.MessageResponse, error) { fmt.Println("AddClass : ", in) - return nil, nil + st, err := storage.GetStorageInstance() + if err != nil { + log.Error().Err(err).Msg("Internal Error") + } else { + rules := in.GetTag() + log.Debug().Msgf("rules %v", rules) + for _, rule := range rules { + log.Debug().Msgf("Rule className %v", rule) + + err := st.AddClass(in.GetDescription(), rule, in.GetName()) + if err != nil { + log.Error().Err(err).Msg("Internal Error") + } + } + return &classpb.MessageResponse{Message: "Class Added sucessfully"}, nil + } + return &classpb.MessageResponse{Message: "Error in adding Class "}, nil + } func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (*classpb.ListResponse, error) { fmt.Println("ListClass : ", in) @@ -57,7 +75,6 @@ func (d *ClassServer) ListClass(ctx context.Context, in *classpb.ListRequest) (* classesOut := []*classpb.ClassResponse{} for _, class := range classes { log.Debug().Msgf("ListTag %v", class) - //classes := []string{tag.Rule} classOut := &classpb.ClassResponse{ Id: strconv.Itoa(class.Id), Name: class.Rule, @@ -157,11 +174,16 @@ func (d *DatasourceServer) AddDatasource(ctx context.Context, in *ds.AddRequest) DsKey: uuid.New().String(), } - err1 := st.AddDataSource(storageDpDataSourceObj) - if err1 != nil { + err = st.AddDataSource(storageDpDataSourceObj) + if err != nil { return &ds.MessageResponse{Message: "Error"}, nil + } else { + err := classifiers.ScanDataSource(storageDpDataSourceObj) + if err != nil { + return &ds.MessageResponse{Message: "Failed to Scan Datasource "}, nil + } } - return &ds.MessageResponse{Message: "Sucess"}, nil + return &ds.MessageResponse{Message: "Data Source added for Scaning"}, nil } func (d *DatasourceServer) ListDatasource(ctx context.Context, in *ds.ListRequest) (*ds.ListResponse, error) { fmt.Println("List Datasource Request ", in) diff --git a/src/storage/internal.go b/src/storage/internal.go index 00f0a97..42130b4 100644 --- a/src/storage/internal.go +++ b/src/storage/internal.go @@ -4,6 +4,7 @@ import ( "database/sql" "log" "os" + "strconv" "sync" _ "modernc.org/sqlite" @@ -122,14 +123,35 @@ func (into InternalStorage) SetSchemaData(dpDbDatabase DpDbDatabase) error { return err } dpDb, err := dbInsert.Exec(dpDbDatabase.Name, dpDbDatabase.Type, dpDbDatabase.DsKey) + var dpDbId int64 if err != nil { log.Println("error2", err.Error()) - return err + //ignore this error. may be the Schema already got scanned + //find the id for this + stmnt := "SELECT id FROM dp_databases WHERE name =" + "\"" + dpDbDatabase.Name + + "\" AND type =" + "\"" + dpDbDatabase.Type + "\" AND dskey =" + "\"" + dpDbDatabase.DsKey + "\"" + log.Println(stmnt) + //"SELECT FROM class dp_databases WHERE name = ? AND type = ? AND dskey = ?" + rows, err := into.SqliteConnc.Query(stmnt) + if err != nil { + return err + } + var id int64 + for rows.Next() { + err = rows.Scan(&id) + if err != nil { + log.Println("error2A", err.Error()) + return err + } + } + rows.Close() + dpDbId = id + } else { + dpDbId, _ = dpDb.LastInsertId() + dbInsert.Close() } - dpDbId, err := dpDb.LastInsertId() - - dbInsert.Close() - + log.Println("dpDbId", dpDbId) + var dbDbTableId int64 for _, table := range dpDbDatabase.DpDbTables { tableInsert, err := into.SqliteConnc.Prepare(` @@ -145,11 +167,30 @@ func (into InternalStorage) SetSchemaData(dpDbDatabase DpDbDatabase) error { dbDbTable, err := tableInsert.Exec(table.Name, dpDbId) if err != nil { log.Println("error3b", err.Error()) - return err - } + stmnt := "SELECT id FROM dp_db_tables WHERE name =" + "\"" + table.Name + + "\" AND dp_db_id = " + strconv.FormatInt(dpDbId, 10) + log.Println(stmnt) + rows, err := into.SqliteConnc.Query(stmnt) + if err != nil { + log.Println("error4444", err.Error()) + return err + } + var id int64 + for rows.Next() { + err = rows.Scan(&id) + if err != nil { + log.Println("error55555", err.Error()) + return err + } + } + dbDbTableId = id + rows.Close() + } else { - dbDbTableId, err := dbDbTable.LastInsertId() - tableInsert.Close() + dbDbTableId, _ = dbDbTable.LastInsertId() + tableInsert.Close() + } + log.Println("dbDbTableId", dbDbTableId) columnInsert, err := into.SqliteConnc.Prepare(` INSERT INTO dp_db_columns ("dp_db_id","dp_db_table_id","column_name","column_type","column_comment","Tags","Classes") @@ -161,12 +202,28 @@ func (into InternalStorage) SetSchemaData(dpDbDatabase DpDbDatabase) error { } for _, column := range table.DpDbColumns { - + log.Println("column:", column) //fmt.Println("INSERT INTO dp_db_columns:", dpDbId, dbDbTableId, column.ColumnName, column.ColumnType, column.ColumnComment) _, err = columnInsert.Exec(dpDbId, dbDbTableId, column.ColumnName, column.ColumnType, column.ColumnComment, column.Tags, column.Classes) if err != nil { log.Println("error5", err.Error()) // continue + //insert failed. We try update because column may exists already + stmnt := "UPDATE dp_db_columns SET column_type=\"" + column.ColumnType + "\"" + + " , column_comment=\"" + column.ColumnComment + "\"" + + " , Tags=\"" + column.Tags + "\"" + + " , Classes=\"" + column.Classes + "\"" + + " WHERE dp_db_id =" + strconv.FormatInt(dpDbId, 10) + " AND" + + " dp_db_table_id =" + strconv.FormatInt(dbDbTableId, 10) + " AND" + + " column_name = \"" + column.ColumnName + "\"" + + log.Println(stmnt) + rows, err := into.SqliteConnc.Query(stmnt) + if err != nil { + log.Println("error5B", err.Error()) + return err + } + rows.Close() } } @@ -198,7 +255,7 @@ func getInternalStorageInstance(dsn string) (InternalStorage, error) { } func NewInternalStorage(dsn string) (InternalStorage, error) { - log.Println("NewInternalStorage enter") + log.Println("NewInternalStorage enter", dsn) var isnew bool _, err := os.Stat(dsn) if os.IsNotExist(err) { @@ -250,6 +307,19 @@ func (insto InternalStorage) AddTag(name string, description string, rules []str return nil } +func (insto InternalStorage) AddClass(description string, rule string, class string) error { + + classInsert, err := insto.SqliteConnc.Prepare(` + INSERT INTO class ("description","rule","class") + VALUES (?,?,?); + `) + _, err = classInsert.Exec(description, rule, class) + + classInsert.Close() + return err + +} + func (insto InternalStorage) DeleteTag(id int64) error { return nil } diff --git a/src/storage/storage_handler.go b/src/storage/storage_handler.go index 977d819..b234d78 100644 --- a/src/storage/storage_handler.go +++ b/src/storage/storage_handler.go @@ -25,18 +25,18 @@ CREATE TABLE IF NOT EXISTS "tag" ( ); CREATE TABLE IF NOT EXISTS "dp_databases" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, - "name" TEXT, - "type" TEXT, + "name" TEXT NOT NULL, + "type" TEXT NOT NULL, "dskey" TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS "dp_db_tables" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, - "name" TEXT DEFAULT NULL, - "dp_db_id" INTEGER DEFAULT NULL + "name" TEXT NOT NULL, + "dp_db_id" INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS "dp_db_columns" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, - "dp_db_id" INTEGER DEFAULT NULL, + "dp_db_id" INTEGER NOT NULL, "dp_db_table_id" INTEGER NOT NULL, "column_name" TEXT NOT NULL, "column_type" TEXT NOT NULL, @@ -57,6 +57,9 @@ CREATE TABLE IF NOT EXISTS "dp_databases" ( "User" TEXT NOT NULL, "Password" TEXT NOT NULL ) ; + CREATE UNIQUE INDEX index_databases ON dp_databases(name,type,dskey); + CREATE UNIQUE INDEX index_tables ON dp_db_tables(dp_db_id,name); + CREATE UNIQUE INDEX index_columns ON dp_db_columns(dp_db_id,dp_db_table_id,column_name); ` type Class struct { @@ -78,6 +81,7 @@ type StorageConfig struct { } type Storage interface { GetClasses() ([]Class, error) + AddClass(string, string, string) error GetTags() ([]Tag, error) AddTag(string, string, []string) error