From f39e5b90fb1177e5435f7836600bfc5831292fd6 Mon Sep 17 00:00:00 2001 From: Abhimanyu Babbar Date: Mon, 17 Oct 2022 16:36:18 +0530 Subject: [PATCH] Minor code changes for us to allow for deletion of s3 datalake deletion --- .../internal/delete/batch/batch.go | 7 +- .../internal/delete/batch/batch_test.go | 2 - .../internal/delete/batch/filehandler/gzip.go | 1 + .../delete/batch/filehandler/gzip_test.go | 111 ++++++++++++------ services/filemanager/filemanager.go | 4 + 5 files changed, 89 insertions(+), 36 deletions(-) diff --git a/regulation-worker/internal/delete/batch/batch.go b/regulation-worker/internal/delete/batch/batch.go index ae8dfe66fa8..71cecf8fffb 100644 --- a/regulation-worker/internal/delete/batch/batch.go +++ b/regulation-worker/internal/delete/batch/batch.go @@ -314,6 +314,9 @@ func getLocalFileHandlers(destType string) map[string]filehandler.LocalFileHandl gzipHandler = filehandler.NewGZIPLocalFileHandler(filehandler.CamelCase) } + // S3_DATALAKE is a warehouse destination, so in order + // to send events into a warehouse destination, we simply snake_cased + // so the gziphandler needs to be created with Snakecasing `user_id` in mind if destType == "S3_DATALAKE" { gzipHandler = filehandler.NewGZIPLocalFileHandler(filehandler.SnakeCase) } @@ -398,7 +401,7 @@ func (bm *BatchManager) Delete(ctx context.Context, job model.Job, destConfig ma g.Go(func() error { filehandler := getFileHandler(files[_i].Key, filehandlers) if filehandler == nil { - pkgLogger.Debugf("unable to locate filehandler for file: %s ", files[_i].Key) + pkgLogger.Warnf("unable to locate filehandler for file: %s ", files[_i].Key) return nil } @@ -462,6 +465,8 @@ func handleIdentityRemoval( attributes []model.User, sourceFile, targetFile string, ) error { + pkgLogger.Debugf("Handling identity removal for source: %s, destination: %s", sourceFile, targetFile) + if err := handler.Read(ctx, sourceFile); err != nil { return fmt.Errorf("unable to parse contents of local file: %s, err: %w", sourceFile, err) } diff --git a/regulation-worker/internal/delete/batch/batch_test.go b/regulation-worker/internal/delete/batch/batch_test.go index 2bda0bc2bf9..df21e3a04b1 100644 --- a/regulation-worker/internal/delete/batch/batch_test.go +++ b/regulation-worker/internal/delete/batch/batch_test.go @@ -113,8 +113,6 @@ func TestBatchDelete(t *testing.T) { t.Fatal(err) } sort.Strings(goldenFilesList) - fmt.Println(goldenFilesList) - fmt.Println(cleanedFilesList) require.Equal(t, len(goldenFilesList), len(cleanedFilesList), "actual number of files in destination bucket different than expected") for i := 0; i < len(goldenFilesList); i++ { goldenFilePtr, err := os.Open(goldenFilesList[i]) diff --git a/regulation-worker/internal/delete/batch/filehandler/gzip.go b/regulation-worker/internal/delete/batch/filehandler/gzip.go index a66b0501e43..0729c48aa6d 100644 --- a/regulation-worker/internal/delete/batch/filehandler/gzip.go +++ b/regulation-worker/internal/delete/batch/filehandler/gzip.go @@ -24,6 +24,7 @@ func NewGZIPLocalFileHandler(casing Case) *GZIPLocalFileHandler { } func (h *GZIPLocalFileHandler) Read(_ context.Context, path string) error { + f, err := os.OpenFile(path, os.O_RDWR, 0o644) if err != nil { return fmt.Errorf("error while opening compressed file, %w", err) diff --git a/regulation-worker/internal/delete/batch/filehandler/gzip_test.go b/regulation-worker/internal/delete/batch/filehandler/gzip_test.go index 178efcc3265..6e2e6004f78 100644 --- a/regulation-worker/internal/delete/batch/filehandler/gzip_test.go +++ b/regulation-worker/internal/delete/batch/filehandler/gzip_test.go @@ -1,6 +1,7 @@ package filehandler import ( + "bytes" "context" "testing" @@ -8,44 +9,88 @@ import ( "github.com/stretchr/testify/require" ) -func TestRemoveIdentityRecordsFromGZIPFile(t *testing.T) { - handler := NewGZIPLocalFileHandler(CamelCase) - handler.records = []byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"userId\": \"my-another-user-id\"}\n") - err := handler.RemoveIdentity(context.TODO(), []model.User{{ID: "my-another-user-id"}}) - - require.Nil(t, err) - require.Equal(t, "{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n", string(handler.records)) - - handler.records = []byte("{\"userId\": \"my-user-id\"}\n") - err = handler.RemoveIdentity(context.TODO(), []model.User{{ID: "my-user-id"}}) - require.Nil(t, err) - require.Equal(t, "", string(handler.records)) - - handler.records = []byte("{\"userId\": \"invalid-user-id\"}\n") - err = handler.RemoveIdentity(context.TODO(), []model.User{{ID: "valid-user-id"}}) - require.Nil(t, err) - require.Equal(t, "{\"userId\": \"invalid-user-id\"}\n", string(handler.records)) +func TestRemoveIdentityRecordsFromGZIPFileWithSingleUserId(t *testing.T) { + + inputs := []struct { + casing Case + userID string + inputByte []byte + expectedByte []byte + }{ + { + CamelCase, + "my-another-user-id", + []byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"userId\": \"my-another-user-id\"}\n"), + []byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n"), + }, + { + CamelCase, + "my-user-id", + []byte("{\"userId\": \"my-user-id\"}\n"), + []byte(""), + }, + { + CamelCase, + "valid-user-id", + []byte("{\"userId\": \"invalid-user-id\"}\n"), + []byte("{\"userId\": \"invalid-user-id\"}\n"), + }, + { + SnakeCase, + "my-another-user-id", + []byte("{\"user_id\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"user_id\": \"my-another-user-id\"}\n"), + []byte("{\"user_id\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n"), + }, + { + SnakeCase, + "my-user-id", + []byte("{\"user_id\": \"my-user-id\"}\n"), + []byte(""), + }, + { + SnakeCase, + "valid-user-id", + []byte("{\"user_id\": \"invalid-user-id\"}\n"), + []byte("{\"user_id\": \"invalid-user-id\"}\n"), + }, + } + + for _, ip := range inputs { + h := NewGZIPLocalFileHandler(ip.casing) + + h.records = ip.inputByte + err := h.RemoveIdentity(context.TODO(), []model.User{{ID: ip.userID}}) + require.Nil(t, err) + require.Equal(t, true, bytes.Equal(h.records, ip.expectedByte)) + } } // router -> batch_router ( 30 second batching ) -> warehouse ( s3datalake ) func TestIdentityDeletePattern(t *testing.T) { - h := NewGZIPLocalFileHandler(CamelCase) - pattern, err := h.getDeletePattern( - model.User{ - ID: "my-user-id", - }) - - require.Nil(t, err) - // gzip userId userId - // user_id - require.Equal(t, "'/\"userId\": *\"my-user-id\"/d'", pattern) - h = NewGZIPLocalFileHandler(SnakeCase) - pattern, err = h.getDeletePattern( - model.User{ID: "my-user-id"}) - - require.Nil(t, err) - require.Equal(t, "'/\"user_id\": *\"my-user-id\"/d'", pattern) + inputs := []struct { + casing Case + userId string + expectedPattern string + }{ + { + CamelCase, + "my-user-id", + "'/\"userId\": *\"my-user-id\"/d'", + }, + { + SnakeCase, + "my-user-id", + "'/\"user_id\": *\"my-user-id\"/d'", + }, + } + + for _, ip := range inputs { + h := NewGZIPLocalFileHandler(ip.casing) + actualPattern, err := h.getDeletePattern(model.User{ID: ip.userId}) + require.Nil(t, err) + require.Equal(t, ip.expectedPattern, actualPattern) + } } func TestIdentityRemovalProcessSucceeds(t *testing.T) { diff --git a/services/filemanager/filemanager.go b/services/filemanager/filemanager.go index 66796b46f36..e7133c98ab6 100644 --- a/services/filemanager/filemanager.go +++ b/services/filemanager/filemanager.go @@ -63,6 +63,10 @@ func init() { // New returns FileManager backed by configured provider func (*FileManagerFactoryT) New(settings *SettingsT) (FileManager, error) { switch settings.Provider { + case "S3_DATALAKE": + return &S3Manager{ + Config: GetS3Config(settings.Config), + }, nil case "S3": return &S3Manager{ Config: GetS3Config(settings.Config),