Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): added base support for s3 datalake deletion as part of regulation API. #2515

Merged
merged 20 commits into from
Nov 1, 2022

Conversation

abhimanyubabbar
Copy link
Contributor

@abhimanyubabbar abhimanyubabbar commented Sep 30, 2022

Description

As per the regulation api, we are adding support to remove the user information data from s3 datalake.

Notion Ticket

https://www.notion.so/rudderstacks/S3-Datalake-Data-Deletion-Request-Dev-tasks-dfdba07974da4ffdb4b40af163bfdefc

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@abhimanyubabbar abhimanyubabbar changed the title Added base support for s3 datalake deletion feat(warehouse): Added base support for s3 datalake deletion as part of regulation API. Oct 1, 2022
@abhimanyubabbar abhimanyubabbar changed the title feat(warehouse): Added base support for s3 datalake deletion as part of regulation API. feat(warehouse): added base support for s3 datalake deletion as part of regulation API. Oct 1, 2022
@abhimanyubabbar abhimanyubabbar marked this pull request as ready for review October 17, 2022 11:14
@lvrach lvrach requested review from lvrach, saurav-malani and a team October 17, 2022 11:23
@codecov
Copy link

codecov bot commented Oct 17, 2022

Codecov Report

Base: 43.77% // Head: 43.84% // Increases project coverage by +0.06% 🎉

Coverage data is based on head (d5fa7dd) compared to base (0ec74d1).
Patch coverage: 56.92% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2515      +/-   ##
==========================================
+ Coverage   43.77%   43.84%   +0.06%     
==========================================
  Files         186      187       +1     
  Lines       39890    40015     +125     
==========================================
+ Hits        17463    17545      +82     
- Misses      21332    21370      +38     
- Partials     1095     1100       +5     
Impacted Files Coverage Δ
regulation-worker/internal/service/looper.go 0.00% <0.00%> (ø)
services/filemanager/filemanager.go 51.28% <0.00%> (-1.82%) ⬇️
warehouse/jobs/runner.go 0.00% <0.00%> (ø)
regulation-worker/internal/delete/batch/batch.go 48.28% <56.05%> (+0.50%) ⬆️
...n-worker/internal/delete/batch/filehandler/gzip.go 62.68% <62.68%> (ø)
...orker/internal/delete/batch/filehandler/parquet.go 65.35% <65.35%> (ø)
regulation-worker/cmd/main.go 58.92% <100.00%> (+0.74%) ⬆️
regulation-worker/internal/client/client.go 67.39% <100.00%> (-0.47%) ⬇️
regulation-worker/internal/delete/delete.go 100.00% <100.00%> (ø)
regulation-worker/internal/service/service.go 70.83% <100.00%> (ø)
... and 1 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Copy link
Member

@lvrach lvrach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor or issues not introduced in this PR, but I will need a second pass

regulation-worker/internal/client/client.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
return fmt.Errorf("error while opening file, %w", err)
}
defer cleanCompressedFilePtr.Close()
f, err = os.OpenFile(filepath.Join(statusTrackerTmpDir, StatusTrackerFileName), os.O_CREATE|os.O_RDWR, 0o644)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we removed os.O_TRUNC, if the file already exists, we are going to error, correct?

It is safer this way, I just wanted to make sure this was the intention behind this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably did a bit more code movement, so it's fuzzing the boundaries.

regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
regulation-worker/internal/delete/batch/batch.go Outdated Show resolved Hide resolved
@@ -63,6 +63,10 @@ func init() {
// New returns FileManager backed by configured provider
func (*FileManagerFactoryT) New(settings *SettingsT) (FileManager, error) {
switch settings.Provider {
case "S3_DATALAKE":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a duplicate fileMangaer for S3 Datalake when we can use S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provider comes in from above and it has S3_DATALAKE in it hardcoded. Let me see what change can I make !

StatusTrackerFileName = "rudderDeleteTracker.txt"
supportedDestinations = []string{"S3"}
supportedDestinations = []string{"S3", "S3_DATALAKE"}
)

const listMaxItem int64 = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe since we have defined FilesLimit in BatchManager struct, we don't need this anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it.

method := "GET"
genEndPoint := "/dataplane/workspaces/{workspace_id}/regulations/workerJobs"
url := fmt.Sprint(j.URLPrefix, prepURL(genEndPoint, j.WorkspaceID))
url := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/workerJobs", j.URLPrefix, j.WorkspaceID)
Copy link
Contributor

@saurav-malani saurav-malani Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating url like this is indeed more readable & better. We can use this same approach to create url in UpdateStatus func also & can get rid of prepURL func.

@@ -0,0 +1,21 @@
package filehandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel file name of this file decreases it's readability. Not sure what would an apt name, how about fileHandler.go .

Also, we might want to change the directory name fileHandler from filehandler too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually names are camelcased in golang but filenames are snakecased in popular repository. The name local_file indicates the interface of local file handling. Let me know if you wanna have an offline discussion.

defer tmpFilePtr.Close()
absPath, err := filepath.Abs(tmpFilePtr.Name())

defer f.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I am ignoring error here is, it is an overkill to abort job & send failed status to manager even when we failed to close file. I believe we can afford to ignore it.

Suggested change
defer f.Close()
defer func(){_=f.Close()}()

Comment on lines +171 to 174
if err := f.Close(); err != nil {
return nil, fmt.Errorf("closing file: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are already defering in line 121

Suggested change
if err := f.Close(); err != nil {
return nil, fmt.Errorf("closing file: %w", err)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat a different way to cleanly handle closing of file. Usually we wanna handle the error that comes with file closing, so we handle it multiple times in such a fashion.

// Note: download happens concurrently in 5 go routine by default.
func (b *Batch) download(_ context.Context, completeFileName string) (string, error) {
pkgLogger.Debugf("downloading file: %v", completeFileName)
func (_ *Batch) cleanedFiles(_ context.Context, path string, job *model.Job) ([]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the list of cleaned files could we really long, won't it make sense to return a pointer instead of []string?

func name suggestion: getCleanedFiles

Copy link
Contributor Author

@abhimanyubabbar abhimanyubabbar Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go doesn't provide automatic support for getters and setters. There's nothing wrong with providing getters and setters yourself, and it's often appropriate to do so, but it's neither idiomatic nor necessary to put Get into the getter's name. If you have a field called owner (lower case, unexported), the getter method should be called Owner (upper case, exported), not GetOwner. The use of upper-case names for export provides the hook to discriminate the field from the method. A setter function, if needed, will likely be called SetOwner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reading this in effective go a while back where I came across this advice to not place get in the name of the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why would a pointer be beneficial ? In go I think, array's are passed by value and cost is pretty minimal right ?

return model.JobStatusFailed
}

pkgLogger.Infof("successfully completed loop of ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pkgLogger.Infof("successfully completed loop of ")
pkgLogger.Infof("successfully completed loop")

Comment on lines 38 to 37
mu sync.Mutex
FM filemanager.FileManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it be better to put file handler inside Batch struct, given it is specific to an instance of deletion job?

Suggested change
mu sync.Mutex
FM filemanager.FileManager
mu sync.Mutex
FM filemanager.FileManager
filehandlers map[string]filehandler.LocalFileHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is filehandler needs to be created on processing of every file as they store the intermediate results to allow for cleaner interface. We have added a factory to instantiate a filehandler everytime we see a file of that sorts. So this might not work.

}
}
absPath, err := filepath.Abs(statusTrackerFilePtr.Name())
fName, err := batch.download(ctx, filepath.Join(prefix, StatusTrackerFileName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion to increase code readability.

Suggested change
fName, err := batch.download(ctx, filepath.Join(prefix, StatusTrackerFileName))
statusTrackerFile, err := batch.download(ctx, filepath.Join(prefix, StatusTrackerFileName))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually within a function name which clearly defines the intent, we can shorten the name to indicate popular structure. I like to indicate files through f etc. Wdyt ?

switch destType {
case "S3":
return map[string]filehandler.LocalFileHandler{
".json.gz": filehandler.NewGZIPLocalFileHandler(filehandler.CamelCase),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we anyways support handling parquet file. And, any file type could we stored in S3, won't it better to add all the supported filehandler for all destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about it. That can be done.

return nil
}

func (h *GZIPLocalFileHandler) RemoveIdentity(ctx context.Context, attributes []model.User) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we have changed the logic of RemoveIdentity and since this is going to be one of the most time consuming part of the loop during deletion. I strongly feel we should benchmark the old & new deletion logic.

I have a feeling that if sed knows about all the users that needs to be matched for, it will do some kind of optimization do avoid literally iterating over each OR condition of pattern file and look for a match in each line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the man of sed, -f is just reading the commands from the file and appending it to the list of commands to sed. -e which we have replaced it with does the same thing as well.

The main approach we have changed is letting sed completely handle the file from which the data needs to be removed and we are doing it in memory. This is something which needs to be benchmarked and checked against.

@sundernagesh18
Copy link

This feature is good to go. QA Approved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants