Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Deletion of artifacts #87

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ module github.com/flyteorg/datacatalog

go 1.18

replace (
github.com/flyteorg/flyteidl => github.com/blackshark-ai/flyteidl v0.24.22-0.20230119104851-e9fb728f4733
github.com/flyteorg/flytestdlib => github.com/blackshark-ai/flytestdlib v1.0.1-0.20230104151410-d6ec6dba8697
)

require (
github.com/Selvatico/go-mocket v1.0.7
github.com/flyteorg/flyteidl v1.2.3
github.com/flyteorg/flytestdlib v1.0.11
github.com/flyteorg/flyteidl v1.3.1
github.com/flyteorg/flytestdlib v1.0.12
github.com/golang/glog v1.0.0
github.com/golang/protobuf v1.5.2
github.com/jackc/pgconn v1.10.1
github.com/mitchellh/mapstructure v1.4.3
github.com/satori/go.uuid v1.2.0
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.7.2
google.golang.org/grpc v1.46.0
gorm.io/driver/postgres v1.2.3
gorm.io/driver/sqlite v1.1.1
Expand Down Expand Up @@ -45,7 +50,7 @@ require (
github.com/flyteorg/stow v0.3.6 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand All @@ -68,7 +73,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
Expand All @@ -78,29 +83,30 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/api v0.76.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/apimachinery v0.20.2 // indirect
k8s.io/client-go v0.0.0-20210217172142-7279fc64d847 // indirect
k8s.io/klog/v2 v2.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.24.1 // indirect
k8s.io/client-go v0.24.1 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
)
639 changes: 500 additions & 139 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewCollectedErrors(code codes.Code, errors []error) error {
errorCollection[idx] = err.Error()
}

return NewDataCatalogError(code, strings.Join((errorCollection), ", "))
return NewDataCatalogError(code, strings.Join(errorCollection, ", "))
}

func IsAlreadyExistsError(err error) bool {
Expand Down
111 changes: 108 additions & 3 deletions pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ type artifactMetrics struct {
updateDataFailureCounter labeled.Counter
deleteDataSuccessCounter labeled.Counter
deleteDataFailureCounter labeled.Counter
deleteResponseTime labeled.StopWatch
deleteSuccessCounter labeled.Counter
deleteFailureCounter labeled.Counter
bulkDeleteResponseTime labeled.StopWatch
bulkDeleteSuccessCounter labeled.Counter
bulkDeleteFailureCounter labeled.Counter
}

type artifactManager struct {
Expand All @@ -54,7 +60,8 @@ type artifactManager struct {
systemMetrics artifactMetrics
}

// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
// CreateArtifact creates an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an
// offloaded location.
func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatalog.CreateArtifactRequest) (*datacatalog.CreateArtifactResponse, error) {
timer := m.systemMetrics.createResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -130,7 +137,7 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
return &datacatalog.CreateArtifactResponse{}, nil
}

// Get the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
// GetArtifact retrieves the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
func (m *artifactManager) GetArtifact(ctx context.Context, request *datacatalog.GetArtifactRequest) (*datacatalog.GetArtifactResponse, error) {
timer := m.systemMetrics.getResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -241,6 +248,8 @@ func (m *artifactManager) getArtifactDataList(ctx context.Context, artifactDataM
return artifactDataList, nil
}

// ListArtifacts returns a paginated list of artifacts matching the provided filter expression, including their
// associated artifact data.
func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalog.ListArtifactsRequest) (*datacatalog.ListArtifactsResponse, error) {
err := validators.ValidateListArtifactRequest(request)
if err != nil {
Expand Down Expand Up @@ -350,7 +359,7 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal

dataLocation, err := m.artifactStore.PutData(ctx, artifact, artifactData)
if err != nil {
logger.Errorf(ctx, "Failed to store artifact data during update, err: %v", err)
logger.Errorf(ctx, "Failed to store artifact data [%v] during update, err: %v", artifactData.Name, err)
m.systemMetrics.updateDataFailureCounter.Inc(ctx)
m.systemMetrics.updateFailureCounter.Inc(ctx)
return nil, err
Expand Down Expand Up @@ -405,6 +414,96 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
}, nil
}

func (m *artifactManager) deleteArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, queryHandle artifactQueryHandle) error {
ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain)

// artifact must already exist, verify first
artifactModel, err := m.findArtifact(ctx, datasetID, queryHandle)
if err != nil {
logger.Errorf(ctx, "Failed to get artifact while trying to delete [%v], err: %v", queryHandle, err)
return err
}

// delete all artifact data from the blob storage
for _, artifactData := range artifactModel.ArtifactData {
if err := m.artifactStore.DeleteData(ctx, artifactData); err != nil {
logger.Errorf(ctx, "Failed to delete artifact data [%v] while deleting artifact [%v], err: %v", artifactData.Name, artifactModel.ArtifactID, err)
m.systemMetrics.deleteDataFailureCounter.Inc(ctx)
return err
}

m.systemMetrics.deleteDataSuccessCounter.Inc(ctx)
}

// delete artifact from DB, also removed associated artifact data entries
err = m.repo.ArtifactRepo().Delete(ctx, artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact [%v] does not exist, err %v", artifactModel.ArtifactID, err)
m.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to delete artifact [%v], err: %v", artifactModel, err)
}
return err
}

logger.Debugf(ctx, "Successfully deleted artifact [%v]", artifactModel.ArtifactID)
return nil
}

// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) {
ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain)

timer := m.systemMetrics.deleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

if err := m.deleteArtifact(ctx, request.GetDataset(), request); err != nil {
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

m.systemMetrics.deleteSuccessCounter.Inc(ctx)
return &datacatalog.DeleteArtifactResponse{}, nil
}

// DeleteArtifacts deletes the given artifacts, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifacts(ctx context.Context, request *datacatalog.DeleteArtifactsRequest) (*datacatalog.DeleteArtifactResponse, error) {
timer := m.systemMetrics.bulkDeleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactsRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx)
return nil, err
}

for _, deleteArtifactReq := range request.Artifacts {
if err := m.deleteArtifact(ctx, deleteArtifactReq.GetDataset(), deleteArtifactReq); err != nil {
// bulk delete endpoint is idempotent, ignore errors regarding missing artifacts as they might've already
// been deleted by a previous call.
if errors.IsDoesNotExistError(err) {
continue
}
m.systemMetrics.bulkDeleteFailureCounter.Inc(ctx)
return nil, err
}
}

m.systemMetrics.bulkDeleteSuccessCounter.Inc(ctx)
return &datacatalog.DeleteArtifactResponse{}, nil
}

func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
artifactMetrics := artifactMetrics{
scope: artifactScope,
Expand All @@ -429,6 +528,12 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da
updateDataFailureCounter: labeled.NewCounter("update_data_failure_count", "The number of times update artifact data failed", artifactScope, labeled.EmitUnlabeledMetric),
deleteDataSuccessCounter: labeled.NewCounter("delete_data_success_count", "The number of times delete artifact data succeeded", artifactScope, labeled.EmitUnlabeledMetric),
deleteDataFailureCounter: labeled.NewCounter("delete_data_failure_count", "The number of times delete artifact data failed", artifactScope, labeled.EmitUnlabeledMetric),
deleteResponseTime: labeled.NewStopWatch("delete_duration", "The duration of the delete artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
deleteSuccessCounter: labeled.NewCounter("delete_success_count", "The number of times delete artifact succeeded", artifactScope, labeled.EmitUnlabeledMetric),
deleteFailureCounter: labeled.NewCounter("delete_failure_count", "The number of times delete artifact failed", artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteResponseTime: labeled.NewStopWatch("bulk_delete_duration", "The duration of the bulk delete artifacts calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteSuccessCounter: labeled.NewCounter("bulk_delete_success_count", "The number of times bulk delete artifacts succeeded", artifactScope, labeled.EmitUnlabeledMetric),
bulkDeleteFailureCounter: labeled.NewCounter("bulk_delete_failure_count", "The number of times bulk delete artifacts failed", artifactScope, labeled.EmitUnlabeledMetric),
}

return &artifactManager{
Expand Down
Loading