diff --git a/CHANGELOG.md b/CHANGELOG.md index a5ce054cdac..59ab94a25c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Features 1. [18279](https://github.com/influxdata/influxdb/pull/18279): Make all pkg applications stateful via stacks +1. [18322](https://github.com/influxdata/influxdb/pull/18322): Add ability to export a stack's existing (as they are in the platform) resource state as a pkg ## v2.0.0-beta.11 [2020-05-26] diff --git a/cmd/influx/pkg_test.go b/cmd/influx/pkg_test.go index 29d71aefe07..c9ca15430cb 100644 --- a/cmd/influx/pkg_test.go +++ b/cmd/influx/pkg_test.go @@ -729,6 +729,10 @@ func (f *fakePkgSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, panic("not implemented") } +func (f *fakePkgSVC) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*pkger.Pkg, error) { + panic("not implemented") +} + func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) { if f.createFn != nil { return f.createFn(ctx, setters...) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index 84b1cddee12..a496495b0df 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -1426,6 +1426,246 @@ func TestLauncher_Pkger(t *testing.T) { }) }) }) + + t.Run("exporting the existing state of stack resources to a pkg", func(t *testing.T) { + testStackApplyFn := func(t *testing.T) (pkger.Summary, pkger.Stack, func()) { + t.Helper() + + stack, cleanup := newStackFn(t, pkger.Stack{}) + defer func() { + if t.Failed() { + cleanup() + } + }() + + var ( + initialBucketPkgName = "rucketeer-1" + initialCheckPkgName = "checkers" + initialDashPkgName = "dash-of-salt" + initialEndpointPkgName = "endzo" + initialLabelPkgName = "labelino" + initialRulePkgName = "oh-doyle-rules" + initialTaskPkgName = "tap" + initialTelegrafPkgName = "teletype" + initialVariablePkgName = "laces-out-dan" + ) + + labelObj := newLabelObject(initialLabelPkgName, "label 1", "init desc", "#222eee") + setAssociation := func(o pkger.Object) pkger.Object { + o.AddAssociations(pkger.ObjectAssociation{ + Kind: pkger.KindLabel, + PkgName: labelObj.Name(), + }) + return o + } + + initialPkg := newPkg( + setAssociation(newBucketObject(initialBucketPkgName, "display name", "init desc")), + setAssociation(newCheckDeadmanObject(t, initialCheckPkgName, "check_0", time.Minute)), + setAssociation(newDashObject(initialDashPkgName, "dash_0", "init desc")), + setAssociation(newEndpointHTTP(initialEndpointPkgName, "endpoint_0", "init desc")), + labelObj, + setAssociation(newRuleObject(t, initialRulePkgName, "rule_0", initialEndpointPkgName, "init desc")), + setAssociation(newTaskObject(initialTaskPkgName, "task_0", "init desc")), + setAssociation(newTelegrafObject(initialTelegrafPkgName, "tele_0", "init desc")), + setAssociation(newVariableObject(initialVariablePkgName, "var char", "init desc")), + ) + + impact, err := svc.Apply(ctx, l.Org.ID, l.User.ID, initialPkg, pkger.ApplyWithStackID(stack.ID)) + require.NoError(t, err) + + summary := impact.Summary + + hasAssociation := func(t *testing.T, actual []pkger.SummaryLabel) { + t.Helper() + require.Len(t, actual, 1, "unexpected number of label mappings") + assert.Equal(t, actual[0].PkgName, labelObj.Name()) + } + + require.Len(t, summary.Buckets, 1) + assert.NotZero(t, summary.Buckets[0].ID) + assert.Equal(t, "display name", summary.Buckets[0].Name) + assert.Equal(t, "init desc", summary.Buckets[0].Description) + hasAssociation(t, summary.Buckets[0].LabelAssociations) + + require.Len(t, summary.Checks, 1) + assert.NotZero(t, summary.Checks[0].Check.GetID()) + assert.Equal(t, "check_0", summary.Checks[0].Check.GetName()) + hasAssociation(t, summary.Checks[0].LabelAssociations) + + require.Len(t, summary.Dashboards, 1) + assert.NotZero(t, summary.Dashboards[0].ID) + assert.Equal(t, "dash_0", summary.Dashboards[0].Name) + hasAssociation(t, summary.Dashboards[0].LabelAssociations) + + require.Len(t, summary.NotificationEndpoints, 1) + assert.NotZero(t, summary.NotificationEndpoints[0].NotificationEndpoint.GetID()) + assert.Equal(t, "endpoint_0", summary.NotificationEndpoints[0].NotificationEndpoint.GetName()) + hasAssociation(t, summary.NotificationEndpoints[0].LabelAssociations) + + require.Len(t, summary.Labels, 1) + assert.NotZero(t, summary.Labels[0].ID) + assert.Equal(t, "label 1", summary.Labels[0].Name) + assert.Equal(t, "init desc", summary.Labels[0].Properties.Description) + assert.Equal(t, "#222eee", summary.Labels[0].Properties.Color) + + require.Len(t, summary.NotificationRules, 1) + assert.NotZero(t, summary.NotificationRules[0].ID) + assert.Equal(t, "rule_0", summary.NotificationRules[0].Name) + assert.Equal(t, initialEndpointPkgName, summary.NotificationRules[0].EndpointPkgName) + assert.Equal(t, "init desc", summary.NotificationRules[0].Description) + hasAssociation(t, summary.NotificationRules[0].LabelAssociations) + + require.Len(t, summary.Tasks, 1) + assert.NotZero(t, summary.Tasks[0].ID) + assert.Equal(t, "task_0", summary.Tasks[0].Name) + assert.Equal(t, "init desc", summary.Tasks[0].Description) + hasAssociation(t, summary.Tasks[0].LabelAssociations) + + require.Len(t, summary.TelegrafConfigs, 1) + assert.NotZero(t, summary.TelegrafConfigs[0].TelegrafConfig.ID) + assert.Equal(t, "tele_0", summary.TelegrafConfigs[0].TelegrafConfig.Name) + assert.Equal(t, "init desc", summary.TelegrafConfigs[0].TelegrafConfig.Description) + hasAssociation(t, summary.TelegrafConfigs[0].LabelAssociations) + + require.Len(t, summary.Variables, 1) + assert.NotZero(t, summary.Variables[0].ID) + assert.Equal(t, "var char", summary.Variables[0].Name) + assert.Equal(t, "init desc", summary.Variables[0].Description) + hasAssociation(t, summary.Variables[0].LabelAssociations) + + // verify changes reflected in platform + { + actualBkt := resourceCheck.mustGetBucket(t, byName("display name")) + assert.Equal(t, summary.Buckets[0].ID, pkger.SafeID(actualBkt.ID)) + + actualCheck := resourceCheck.mustGetCheck(t, byName("check_0")) + assert.Equal(t, summary.Checks[0].Check.GetID(), actualCheck.GetID()) + + actualDash := resourceCheck.mustGetDashboard(t, byName("dash_0")) + assert.Equal(t, summary.Dashboards[0].ID, pkger.SafeID(actualDash.ID)) + + actualEndpint := resourceCheck.mustGetEndpoint(t, byName("endpoint_0")) + assert.Equal(t, summary.NotificationEndpoints[0].NotificationEndpoint.GetID(), actualEndpint.GetID()) + + actualLabel := resourceCheck.mustGetLabel(t, byName("label 1")) + assert.Equal(t, summary.Labels[0].ID, pkger.SafeID(actualLabel.ID)) + + actualRule := resourceCheck.mustGetRule(t, byName("rule_0")) + assert.Equal(t, summary.NotificationRules[0].ID, pkger.SafeID(actualRule.GetID())) + + actualTask := resourceCheck.mustGetTask(t, byName("task_0")) + assert.Equal(t, summary.Tasks[0].ID, pkger.SafeID(actualTask.ID)) + + actualTele := resourceCheck.mustGetTelegrafConfig(t, byName("tele_0")) + assert.Equal(t, summary.TelegrafConfigs[0].TelegrafConfig.ID, actualTele.ID) + + actualVar := resourceCheck.mustGetVariable(t, byName("var char")) + assert.Equal(t, summary.Variables[0].ID, pkger.SafeID(actualVar.ID)) + } + + return summary, stack, cleanup + } + + t.Run("should be return pkg matching source pkg when all resources are unchanged", func(t *testing.T) { + initialSum, stack, cleanup := testStackApplyFn(t) + defer cleanup() + + exportedPkg, err := svc.ExportStack(ctx, l.Org.ID, stack.ID) + require.NoError(t, err) + + hasAssociation := func(t *testing.T, actual []pkger.SummaryLabel) { + t.Helper() + assert.Len(t, actual, 1, "unexpected number of label mappings") + if len(actual) != 1 { + return + } + assert.Equal(t, actual[0].PkgName, initialSum.Labels[0].PkgName) + } + + sum := exportedPkg.Summary() + + require.Len(t, sum.Buckets, 1, "missing required buckets") + assert.Equal(t, initialSum.Buckets[0].PkgName, sum.Buckets[0].PkgName) + assert.Equal(t, initialSum.Buckets[0].Name, sum.Buckets[0].Name) + hasAssociation(t, sum.Buckets[0].LabelAssociations) + + require.Len(t, sum.Checks, 1, "missing required checks") + assert.Equal(t, initialSum.Checks[0].PkgName, sum.Checks[0].PkgName) + assert.Equal(t, initialSum.Checks[0].Check.GetName(), sum.Checks[0].Check.GetName()) + hasAssociation(t, sum.Checks[0].LabelAssociations) + + require.Len(t, sum.Dashboards, 1, "missing required dashboards") + assert.Equal(t, initialSum.Dashboards[0].PkgName, sum.Dashboards[0].PkgName) + assert.Equal(t, initialSum.Dashboards[0].Name, sum.Dashboards[0].Name) + hasAssociation(t, sum.Dashboards[0].LabelAssociations) + + require.Len(t, sum.Labels, 1, "missing required labels") + assert.Equal(t, initialSum.Labels[0].PkgName, sum.Labels[0].PkgName) + assert.Equal(t, initialSum.Labels[0].Name, sum.Labels[0].Name) + + require.Len(t, sum.NotificationRules, 1, "missing required rules") + assert.Equal(t, initialSum.NotificationRules[0].PkgName, sum.NotificationRules[0].PkgName) + assert.Equal(t, initialSum.NotificationRules[0].Name, sum.NotificationRules[0].Name) + assert.Equal(t, initialSum.NotificationRules[0].EndpointPkgName, sum.NotificationRules[0].EndpointPkgName) + assert.Equal(t, initialSum.NotificationRules[0].EndpointType, sum.NotificationRules[0].EndpointType) + hasAssociation(t, sum.NotificationRules[0].LabelAssociations) + + require.Len(t, sum.Tasks, 1, "missing required tasks") + assert.Equal(t, initialSum.Tasks[0].PkgName, sum.Tasks[0].PkgName) + assert.Equal(t, initialSum.Tasks[0].Name, sum.Tasks[0].Name) + hasAssociation(t, sum.Tasks[0].LabelAssociations) + + require.Len(t, sum.TelegrafConfigs, 1, "missing required telegraf configs") + assert.Equal(t, initialSum.TelegrafConfigs[0].PkgName, sum.TelegrafConfigs[0].PkgName) + assert.Equal(t, initialSum.TelegrafConfigs[0].TelegrafConfig.Name, sum.TelegrafConfigs[0].TelegrafConfig.Name) + hasAssociation(t, sum.TelegrafConfigs[0].LabelAssociations) + + require.Len(t, sum.Variables, 1, "missing required variables") + assert.Equal(t, initialSum.Variables[0].PkgName, sum.Variables[0].PkgName) + assert.Equal(t, initialSum.Variables[0].Name, sum.Variables[0].Name) + hasAssociation(t, sum.Variables[0].LabelAssociations) + }) + + t.Run("should not export associations if association removed in platform", func(t *testing.T) { + stack, cleanup := newStackFn(t, pkger.Stack{}) + defer cleanup() + + labelObj := newLabelObject("test-label", "", "", "") + bktObj := newBucketObject("test-bucket", "", "") + bktObj.AddAssociations(pkger.ObjectAssociation{ + Kind: pkger.KindLabel, + PkgName: labelObj.Name(), + }) + pkg := newPkg(bktObj, labelObj) + + impact, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg, pkger.ApplyWithStackID(stack.ID)) + require.NoError(t, err) + + require.Len(t, impact.Summary.Labels, 1) + require.Len(t, impact.Summary.Buckets, 1) + require.Len(t, impact.Summary.Buckets[0].LabelAssociations, 1) + + // TODO(jsteenb2): cannot figure out the issue with the http.LabelService returning + // the bad HTTP method error :-(. Revisit this and replace with the + // the HTTP client when possible. Goal is to remove the kvservice + // dep here. + err = l.kvService.DeleteLabelMapping(ctx, &influxdb.LabelMapping{ + LabelID: influxdb.ID(impact.Summary.Labels[0].ID), + ResourceID: influxdb.ID(impact.Summary.Buckets[0].ID), + ResourceType: influxdb.BucketsResourceType, + }) + require.NoError(t, err) + + exportedPkg, err := svc.ExportStack(ctx, l.Org.ID, stack.ID) + require.NoError(t, err) + + exportedSum := exportedPkg.Summary() + require.Len(t, exportedSum.Labels, 1) + require.Len(t, exportedSum.Buckets, 1) + require.Empty(t, exportedSum.Buckets[0].LabelAssociations, "received unexpected label associations") + }) + }) }) t.Run("errors incurred during application of package rolls back to state before package", func(t *testing.T) { diff --git a/http/swagger.yml b/http/swagger.yml index e5dcee6e755..dbc58d09502 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -4728,6 +4728,9 @@ paths: application/json: schema: $ref: "#/components/schemas/Pkg" + application/x-yaml: + schema: + $ref: "#/components/schemas/Pkg" default: description: Unexpected error content: @@ -4942,6 +4945,41 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + /packages/stacks/{stack_id}/export: + delete: + operationId: ExportStack + tags: + - InfluxPackages + summary: Export a stack's resources in the form of a package + parameters: + - in: path + name: stack_id + required: true + schema: + type: string + description: The stack id to be removed + - in: query + name: orgID + required: true + schema: + type: string + description: The organization id of the user + responses: + "200": + description: Stack and all its associated resources are deleted + content: + application/json: + schema: + $ref: "#/components/schemas/Pkg" + application/x-yaml: + schema: + $ref: "#/components/schemas/Pkg" + default: + description: Unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /tasks: get: operationId: GetTasks diff --git a/kit/transport/http/api.go b/kit/transport/http/api.go index f004560739c..43bfd5c1661 100644 --- a/kit/transport/http/api.go +++ b/kit/transport/http/api.go @@ -184,12 +184,37 @@ func (a *API) Respond(w http.ResponseWriter, r *http.Request, status int, v inte return } + a.write(w, writer, status, b) +} + +// Write allows the user to write raw bytes to the response writer. This +// operation does not have a fail case, all failures here will be logged. +func (a *API) Write(w http.ResponseWriter, status int, b []byte) { + if status == http.StatusNoContent { + w.WriteHeader(status) + return + } + + var writer io.WriteCloser = noopCloser{Writer: w} + // we'll double close to make sure its always closed even + //on issues before the write + defer writer.Close() + + if a != nil && a.encodeGZIP { + w.Header().Set("Content-Encoding", "gzip") + writer = gzip.NewWriter(w) + } + + a.write(w, writer, status, b) +} + +func (a *API) write(w http.ResponseWriter, wc io.WriteCloser, status int, b []byte) { w.WriteHeader(status) - if _, err := writer.Write(b); err != nil { + if _, err := wc.Write(b); err != nil { a.logErr("failed to write to response writer", zap.Error(err)) } - if err := writer.Close(); err != nil { + if err := wc.Close(); err != nil { a.logErr("failed to close response writer", zap.Error(err)) } } diff --git a/pkger/http_remote_service.go b/pkger/http_remote_service.go index bedbdc28af5..b3bb8dd9717 100644 --- a/pkger/http_remote_service.go +++ b/pkger/http_remote_service.go @@ -62,6 +62,30 @@ func (s *HTTPRemoteService) DeleteStack(ctx context.Context, identifiers struct{ Do(ctx) } +func (s *HTTPRemoteService) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) { + pkg := new(Pkg) + err := s.Client. + Get(RoutePrefix, "stacks", stackID.String(), "export"). + QueryParams([2]string{"orgID", orgID.String()}). + Decode(func(resp *http.Response) error { + decodedPkg, err := Parse(EncodingJSON, FromReader(resp.Body)) + if err != nil { + return err + } + pkg = decodedPkg + return nil + }). + Do(ctx) + if err != nil { + return nil, err + } + + if err := pkg.Validate(ValidWithoutResources()); err != nil { + return nil, err + } + return pkg, nil +} + func (s *HTTPRemoteService) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { queryParams := [][2]string{{"orgID", orgID.String()}} for _, name := range f.Names { diff --git a/pkger/http_server.go b/pkger/http_server.go index 735b96dbb15..d1ebbcda960 100644 --- a/pkger/http_server.go +++ b/pkger/http_server.go @@ -56,6 +56,8 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer { r.Post("/", svr.createStack) r.Get("/", svr.listStacks) r.Delete("/{stack_id}", svr.deleteStack) + r.With(middleware.AllowContentType("text/yml", "application/x-yaml", "application/json")). + Get("/{stack_id}/export", svr.exportStack) }) } @@ -243,6 +245,47 @@ func (s *HTTPServer) deleteStack(w http.ResponseWriter, r *http.Request) { s.api.Respond(w, r, http.StatusNoContent, nil) } +func (s *HTTPServer) exportStack(w http.ResponseWriter, r *http.Request) { + encoding := pkgEncoding(r.Header) + + orgID, err := getRequiredOrgIDFromQuery(r.URL.Query()) + if err != nil { + s.api.Err(w, r, err) + return + } + + stackID, err := influxdb.IDFromString(chi.URLParam(r, "stack_id")) + if err != nil { + s.api.Err(w, r, &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "the stack id provided in the path was invalid", + Err: err, + }) + return + } + + pkg, err := s.svc.ExportStack(r.Context(), orgID, *stackID) + if err != nil { + s.api.Err(w, r, err) + return + } + + b, err := pkg.Encode(encoding) + if err != nil { + s.api.Err(w, r, err) + return + } + + switch encoding { + case EncodingYAML: + w.Header().Set("Content-Type", "application/x-yaml") + default: + w.Header().Set("Content-Type", "application/json; charset=utf-8") + } + + s.api.Write(w, http.StatusOK, b) +} + func getRequiredOrgIDFromQuery(q url.Values) (influxdb.ID, error) { orgIDRaw := q.Get("orgID") if orgIDRaw == "" { diff --git a/pkger/http_server_test.go b/pkger/http_server_test.go index 84ecafbfabb..2c5572af4da 100644 --- a/pkger/http_server_test.go +++ b/pkger/http_server_test.go @@ -857,6 +857,10 @@ func (f *fakeSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Us panic("not implemented yet") } +func (f *fakeSVC) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*pkger.Pkg, error) { + panic("not implemented") +} + func (f *fakeSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) { if f.listStacksFn == nil { panic("not implemented") diff --git a/pkger/service.go b/pkger/service.go index 41ef9b0675e..2d8f6f27b25 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -61,6 +61,7 @@ const ResourceTypeStack influxdb.ResourceType = "stack" type SVC interface { InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error + ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error) CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error) @@ -333,6 +334,182 @@ func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Us return s.store.DeleteStack(ctx, identifiers.StackID) } +func (s *Service) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) { + stack, err := s.store.ReadStackByID(ctx, stackID) + if err != nil { + return nil, err + } + + labelObjs, availablePkgLabels, err := s.exportStackLabels(ctx, stack.Resources) + if err != nil { + return nil, err + } + + pkg := Pkg{Objects: labelObjs} + for _, res := range stack.Resources { + var ( + obj Object + err error + ) + + switch res.Kind { + case KindBucket: + bkt, err := s.bucketSVC.FindBucketByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find bucket[%s]", res.ID.String())) + } + obj = BucketToObject("", *bkt) + case KindCheck, KindCheckDeadman, KindCheckThreshold: + ch, err := s.checkSVC.FindCheckByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find check[%s]", res.ID.String())) + } + obj = CheckToObject("", ch) + case KindDashboard: + dash, err := s.dashSVC.FindDashboardByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find dashboard[%s]", res.ID.String())) + } + obj = DashboardToObject("", *dash) + case KindLabel: + // these labels have already been discovered. All valid associations with the + // labels to be exported will be added, those that are not, will not be added. + continue + case KindNotificationEndpoint, + KindNotificationEndpointHTTP, + KindNotificationEndpointPagerDuty, + KindNotificationEndpointSlack: + e, err := s.endpointSVC.FindNotificationEndpointByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find endpoint[%s]", res.ID.String())) + } + obj = NotificationEndpointToObject("", e) + case KindNotificationRule: + e, err := s.ruleSVC.FindNotificationRuleByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find rule[%s]", res.ID.String())) + } + var endpointName string + for _, ass := range res.Associations { + if ass.Kind.is(KindNotificationEndpoint) { + endpointName = ass.PkgName + break + } + } + // rule is invalid if the endpoint is deleted + if endpointName == "" { + continue + } + obj = NotificationRuleToObject("", endpointName, e) + case KindTask: + t, err := s.taskSVC.FindTaskByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find task[%s]", res.ID.String())) + } + obj = TaskToObject("", *t) + case KindTelegraf: + t, err := s.teleSVC.FindTelegrafConfigByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find telegraf config[%s]", res.ID.String())) + } + obj = TelegrafToObject("", *t) + case KindVariable: + v, err := s.varSVC.FindVariableByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, ierrors.Wrap(err, fmt.Sprintf("failed to find variable[%s]", res.ID.String())) + } + obj = VariableToObject("", *v) + default: + continue + } + + labelAssociations, err := s.exportStackResourceLabelAssociations(ctx, res, availablePkgLabels) + if err != nil { + return nil, err + } + if len(labelAssociations) > 0 { + obj.AddAssociations(labelAssociations...) + } + obj.SetMetadataName(res.PkgName) + pkg.Objects = append(pkg.Objects, obj) + } + return &pkg, nil +} + +func (s *Service) exportStackLabels(ctx context.Context, resources []StackResource) ([]Object, map[string]string, error) { + var objects []Object + availablePkgLabels := make(map[string]string) // pkgName => label name + for _, res := range resources { + if !res.Kind.is(KindLabel) { + continue + } + label, err := s.labelSVC.FindLabelByID(ctx, res.ID) + if influxdb.ErrorCode(err) == influxdb.ENotFound { + continue + } + if err != nil { + return nil, nil, ierrors.Wrap(err, fmt.Sprintf("failed to find label[%s]", res.ID.String())) + } + obj := LabelToObject("", *label) + obj.SetMetadataName(res.PkgName) + objects = append(objects, obj) + availablePkgLabels[res.PkgName] = label.Name + } + return objects, availablePkgLabels, nil +} + +func (s *Service) exportStackResourceLabelAssociations(ctx context.Context, res StackResource, availablePkgLabels map[string]string) ([]ObjectAssociation, error) { + labels, err := s.labelSVC.FindResourceLabels(ctx, influxdb.LabelMappingFilter{ + ResourceID: res.ID, + ResourceType: res.Kind.ResourceType(), + }) + if err != nil { + wrapper := fmt.Sprintf("failed to find label mappings for %s[%s]", res.Kind, res.ID) + return nil, ierrors.Wrap(err, wrapper) + } + + existingLabels := make(map[string]bool) + for _, l := range labels { + existingLabels[l.Name] = true + } + + var associations []ObjectAssociation + for _, ass := range res.Associations { + // only labels that exist in the PKG AND PLATFORM will be exported as associations. + if ass.Kind.is(KindLabel) && existingLabels[availablePkgLabels[ass.PkgName]] { + associations = append(associations, ObjectAssociation{ + Kind: KindLabel, + PkgName: ass.PkgName, + }) + } + } + return associations, nil +} + // ListFilter are filter options for filtering stacks from being returned. type ListFilter struct { StackIDs []influxdb.ID diff --git a/pkger/service_auth.go b/pkger/service_auth.go index 053b02c5a9e..dd567054e0a 100644 --- a/pkger/service_auth.go +++ b/pkger/service_auth.go @@ -44,6 +44,14 @@ func (s *authMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Use return s.next.DeleteStack(ctx, identifiers) } +func (s *authMW) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) { + err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction) + if err != nil { + return nil, err + } + return s.next.ExportStack(ctx, orgID, stackID) +} + func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction) if err != nil { diff --git a/pkger/service_logging.go b/pkger/service_logging.go index 273e14b32bb..71267a5ec9d 100644 --- a/pkger/service_logging.go +++ b/pkger/service_logging.go @@ -61,6 +61,23 @@ func (s *loggingMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, return s.next.DeleteStack(ctx, identifiers) } +func (s *loggingMW) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (pkg *Pkg, err error) { + defer func(start time.Time) { + if err == nil { + return + } + + s.logger.Error( + "failed to export stack", + zap.Error(err), + zap.Stringer("orgID", orgID), + zap.Stringer("stackID", stackID), + zap.Duration("took", time.Since(start)), + ) + }(time.Now()) + return s.next.ExportStack(ctx, orgID, stackID) +} + func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) { defer func(start time.Time) { if err == nil { diff --git a/pkger/service_metrics.go b/pkger/service_metrics.go index 90e34ecb30f..ffe9cadc7ab 100644 --- a/pkger/service_metrics.go +++ b/pkger/service_metrics.go @@ -38,6 +38,12 @@ func (s *mwMetrics) DeleteStack(ctx context.Context, identifiers struct{ OrgID, return rec(s.next.DeleteStack(ctx, identifiers)) } +func (s *mwMetrics) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) { + rec := s.rec.Record("export_stack") + pkg, err := s.next.ExportStack(ctx, orgID, stackID) + return pkg, rec(err) +} + func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { rec := s.rec.Record("list_stacks") stacks, err := s.next.ListStacks(ctx, orgID, f) diff --git a/pkger/service_tracing.go b/pkger/service_tracing.go index 92c83e44f8e..2c764c39ba6 100644 --- a/pkger/service_tracing.go +++ b/pkger/service_tracing.go @@ -33,6 +33,14 @@ func (s *traceMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, Us return s.next.DeleteStack(ctx, identifiers) } +func (s *traceMW) ExportStack(ctx context.Context, orgID, stackID influxdb.ID) (*Pkg, error) { + span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ExportStack") + span.LogFields(log.String("org_id", orgID.String())) + span.LogFields(log.String("stack_id", stackID.String())) + defer span.Finish() + return s.next.ExportStack(ctx, orgID, stackID) +} + func (s *traceMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) { span, ctx := tracing.StartSpanFromContextWithOperationName(ctx, "ListStacks") defer span.Finish()