Skip to content

Commit

Permalink
Minor code changes for us to allow for deletion of s3 datalake deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
abhimanyubabbar committed Oct 17, 2022
1 parent 96ad5b4 commit f39e5b9
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 36 deletions.
7 changes: 6 additions & 1 deletion regulation-worker/internal/delete/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ func getLocalFileHandlers(destType string) map[string]filehandler.LocalFileHandl
gzipHandler = filehandler.NewGZIPLocalFileHandler(filehandler.CamelCase)
}

// S3_DATALAKE is a warehouse destination, so in order
// to send events into a warehouse destination, we simply snake_cased
// so the gziphandler needs to be created with Snakecasing `user_id` in mind
if destType == "S3_DATALAKE" {
gzipHandler = filehandler.NewGZIPLocalFileHandler(filehandler.SnakeCase)
}
Expand Down Expand Up @@ -398,7 +401,7 @@ func (bm *BatchManager) Delete(ctx context.Context, job model.Job, destConfig ma
g.Go(func() error {
filehandler := getFileHandler(files[_i].Key, filehandlers)
if filehandler == nil {
pkgLogger.Debugf("unable to locate filehandler for file: %s ", files[_i].Key)
pkgLogger.Warnf("unable to locate filehandler for file: %s ", files[_i].Key)
return nil
}

Expand Down Expand Up @@ -462,6 +465,8 @@ func handleIdentityRemoval(
attributes []model.User,
sourceFile, targetFile string,
) error {
pkgLogger.Debugf("Handling identity removal for source: %s, destination: %s", sourceFile, targetFile)

if err := handler.Read(ctx, sourceFile); err != nil {
return fmt.Errorf("unable to parse contents of local file: %s, err: %w", sourceFile, err)
}
Expand Down
2 changes: 0 additions & 2 deletions regulation-worker/internal/delete/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ func TestBatchDelete(t *testing.T) {
t.Fatal(err)
}
sort.Strings(goldenFilesList)
fmt.Println(goldenFilesList)
fmt.Println(cleanedFilesList)
require.Equal(t, len(goldenFilesList), len(cleanedFilesList), "actual number of files in destination bucket different than expected")
for i := 0; i < len(goldenFilesList); i++ {
goldenFilePtr, err := os.Open(goldenFilesList[i])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewGZIPLocalFileHandler(casing Case) *GZIPLocalFileHandler {
}

func (h *GZIPLocalFileHandler) Read(_ context.Context, path string) error {

f, err := os.OpenFile(path, os.O_RDWR, 0o644)
if err != nil {
return fmt.Errorf("error while opening compressed file, %w", err)
Expand Down
111 changes: 78 additions & 33 deletions regulation-worker/internal/delete/batch/filehandler/gzip_test.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,96 @@
package filehandler

import (
"bytes"
"context"
"testing"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/stretchr/testify/require"
)

func TestRemoveIdentityRecordsFromGZIPFile(t *testing.T) {
handler := NewGZIPLocalFileHandler(CamelCase)
handler.records = []byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"userId\": \"my-another-user-id\"}\n")
err := handler.RemoveIdentity(context.TODO(), []model.User{{ID: "my-another-user-id"}})

require.Nil(t, err)
require.Equal(t, "{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n", string(handler.records))

handler.records = []byte("{\"userId\": \"my-user-id\"}\n")
err = handler.RemoveIdentity(context.TODO(), []model.User{{ID: "my-user-id"}})
require.Nil(t, err)
require.Equal(t, "", string(handler.records))

handler.records = []byte("{\"userId\": \"invalid-user-id\"}\n")
err = handler.RemoveIdentity(context.TODO(), []model.User{{ID: "valid-user-id"}})
require.Nil(t, err)
require.Equal(t, "{\"userId\": \"invalid-user-id\"}\n", string(handler.records))
func TestRemoveIdentityRecordsFromGZIPFileWithSingleUserId(t *testing.T) {

inputs := []struct {
casing Case
userID string
inputByte []byte
expectedByte []byte
}{
{
CamelCase,
"my-another-user-id",
[]byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"userId\": \"my-another-user-id\"}\n"),
[]byte("{\"userId\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n"),
},
{
CamelCase,
"my-user-id",
[]byte("{\"userId\": \"my-user-id\"}\n"),
[]byte(""),
},
{
CamelCase,
"valid-user-id",
[]byte("{\"userId\": \"invalid-user-id\"}\n"),
[]byte("{\"userId\": \"invalid-user-id\"}\n"),
},
{
SnakeCase,
"my-another-user-id",
[]byte("{\"user_id\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n{\"user_id\": \"my-another-user-id\"}\n"),
[]byte("{\"user_id\": \"my-user-id\", \"context-app-name\": \"my-app-name\"}\n"),
},
{
SnakeCase,
"my-user-id",
[]byte("{\"user_id\": \"my-user-id\"}\n"),
[]byte(""),
},
{
SnakeCase,
"valid-user-id",
[]byte("{\"user_id\": \"invalid-user-id\"}\n"),
[]byte("{\"user_id\": \"invalid-user-id\"}\n"),
},
}

for _, ip := range inputs {
h := NewGZIPLocalFileHandler(ip.casing)

h.records = ip.inputByte
err := h.RemoveIdentity(context.TODO(), []model.User{{ID: ip.userID}})
require.Nil(t, err)
require.Equal(t, true, bytes.Equal(h.records, ip.expectedByte))
}
}

// router -> batch_router ( 30 second batching ) -> warehouse ( s3datalake )
func TestIdentityDeletePattern(t *testing.T) {
h := NewGZIPLocalFileHandler(CamelCase)
pattern, err := h.getDeletePattern(
model.User{
ID: "my-user-id",
})

require.Nil(t, err)
// gzip userId userId
// user_id
require.Equal(t, "'/\"userId\": *\"my-user-id\"/d'", pattern)

h = NewGZIPLocalFileHandler(SnakeCase)
pattern, err = h.getDeletePattern(
model.User{ID: "my-user-id"})

require.Nil(t, err)
require.Equal(t, "'/\"user_id\": *\"my-user-id\"/d'", pattern)
inputs := []struct {
casing Case
userId string
expectedPattern string
}{
{
CamelCase,
"my-user-id",
"'/\"userId\": *\"my-user-id\"/d'",
},
{
SnakeCase,
"my-user-id",
"'/\"user_id\": *\"my-user-id\"/d'",
},
}

for _, ip := range inputs {
h := NewGZIPLocalFileHandler(ip.casing)
actualPattern, err := h.getDeletePattern(model.User{ID: ip.userId})
require.Nil(t, err)
require.Equal(t, ip.expectedPattern, actualPattern)
}
}

func TestIdentityRemovalProcessSucceeds(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions services/filemanager/filemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func init() {
// New returns FileManager backed by configured provider
func (*FileManagerFactoryT) New(settings *SettingsT) (FileManager, error) {
switch settings.Provider {
case "S3_DATALAKE":
return &S3Manager{
Config: GetS3Config(settings.Config),
}, nil
case "S3":
return &S3Manager{
Config: GetS3Config(settings.Config),
Expand Down

0 comments on commit f39e5b9

Please sign in to comment.