From 21d42465d597e50526f366df85b5b3c99af8f08b Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 6 Jan 2020 15:03:44 -0800 Subject: [PATCH] Add audit logging (#50) --- Gopkg.lock | 1 + boilerplate/lyft/golang_test_targets/Makefile | 1 - pkg/audit/common.go | 11 ++ pkg/audit/log.go | 96 ++++++++++++++++ pkg/audit/log_test.go | 35 ++++++ pkg/audit/message.go | 55 +++++++++ pkg/audit/util.go | 93 +++++++++++++++ pkg/audit/util_test.go | 106 ++++++++++++++++++ pkg/auth/handlers.go | 24 +++- pkg/auth/handlers_test.go | 3 +- pkg/common/constants.go | 5 + pkg/config/config.go | 9 +- pkg/manager/impl/execution_manager.go | 7 +- .../impl/project_attributes_manager.go | 3 + .../impl/project_domain_attributes_manager.go | 4 + .../impl/workflow_attributes_manager.go | 4 + pkg/rpc/adminservice/attributes.go | 93 +++++++++++++++ pkg/rpc/adminservice/execution.go | 54 +++++++++ pkg/rpc/adminservice/launch_plan.go | 58 ++++++++++ pkg/rpc/adminservice/named_entity.go | 28 +++++ pkg/rpc/adminservice/node_execution.go | 38 +++++++ pkg/rpc/adminservice/project.go | 19 ++++ pkg/rpc/adminservice/task.go | 35 +++++- pkg/rpc/adminservice/task_execution.go | 37 +++++- .../adminservice/tests/project_domain_test.go | 7 +- pkg/rpc/adminservice/tests/project_test.go | 6 +- pkg/rpc/adminservice/workflow.go | 35 +++++- 27 files changed, 847 insertions(+), 20 deletions(-) create mode 100644 pkg/audit/common.go create mode 100644 pkg/audit/log.go create mode 100644 pkg/audit/log_test.go create mode 100644 pkg/audit/message.go create mode 100644 pkg/audit/util.go create mode 100644 pkg/audit/util_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 9e4398163c..235f2a8009 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1243,6 +1243,7 @@ "google.golang.org/grpc/credentials", "google.golang.org/grpc/grpclog", "google.golang.org/grpc/metadata", + "google.golang.org/grpc/peer", "google.golang.org/grpc/reflection", "google.golang.org/grpc/status", "gopkg.in/gormigrate.v1", diff --git a/boilerplate/lyft/golang_test_targets/Makefile b/boilerplate/lyft/golang_test_targets/Makefile index 948af0c633..00caee83c5 100644 --- a/boilerplate/lyft/golang_test_targets/Makefile +++ b/boilerplate/lyft/golang_test_targets/Makefile @@ -10,7 +10,6 @@ lint: #lints the package for common code smells # However, that call seem to have some effects (e.g. https://github.com/golang/go/issues/29452) which, for some # reason, allows the subsequent calls to succeed. # TODO: Evaluate whether this is still a problem after moving admin dependency system to go modules. - GO111MODULE=off golangci-lint run --exclude deprecated -v || true GO111MODULE=off golangci-lint run --deadline=5m --exclude deprecated -v # If code is failing goimports linter, this will fix. diff --git a/pkg/audit/common.go b/pkg/audit/common.go new file mode 100644 index 0000000000..05e5ffb9df --- /dev/null +++ b/pkg/audit/common.go @@ -0,0 +1,11 @@ +package audit + +import ( + "time" +) + +type AuthenticatedClientMeta struct { + ClientIds []string + TokenIssuedAt time.Time + ClientIP string +} diff --git a/pkg/audit/log.go b/pkg/audit/log.go new file mode 100644 index 0000000000..88986b44a5 --- /dev/null +++ b/pkg/audit/log.go @@ -0,0 +1,96 @@ +package audit + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/lyft/flyteadmin/pkg/common" + "github.com/lyft/flyteadmin/pkg/errors" + "github.com/lyft/flytestdlib/logger" + "google.golang.org/grpc/codes" +) + +type LogBuilder interface { + WithAuthenticatedCtx(ctx context.Context) LogBuilder + WithRequest(method string, parameters map[string]string, mode AccessMode, requestedAt time.Time) LogBuilder + WithResponse(sentAt time.Time, err error) LogBuilder + Log(ctx context.Context) +} + +type logBuilder struct { + auditLog Message + readOnly bool +} + +func (b *logBuilder) WithAuthenticatedCtx(ctx context.Context) LogBuilder { + clientMeta := ctx.Value(common.AuditFieldsContextKey) + switch clientMeta.(type) { + case AuthenticatedClientMeta: + b.auditLog.Principal = Principal{ + Subject: ctx.Value(common.PrincipalContextKey).(string), + TokenIssuedAt: clientMeta.(AuthenticatedClientMeta).TokenIssuedAt, + } + if len(clientMeta.(AuthenticatedClientMeta).ClientIds) > 0 { + b.auditLog.Principal.ClientID = clientMeta.(AuthenticatedClientMeta).ClientIds[0] + } + b.auditLog.Client = Client{ + ClientIP: clientMeta.(AuthenticatedClientMeta).ClientIP, + } + default: + logger.Warningf(ctx, "Failed to parse authenticated client metadata when creating audit log") + } + return b +} + +// TODO: Also look into passing down HTTP verb +func (b *logBuilder) WithRequest(method string, parameters map[string]string, mode AccessMode, + requestedAt time.Time) LogBuilder { + b.auditLog.Request = Request{ + Method: method, + Parameters: parameters, + Mode: mode, + ReceivedAt: requestedAt, + } + return b +} + +func (b *logBuilder) WithResponse(sentAt time.Time, err error) LogBuilder { + responseCode := codes.OK.String() + if err != nil { + switch err := err.(type) { + case errors.FlyteAdminError: + responseCode = err.(errors.FlyteAdminError).Code().String() + default: + responseCode = codes.Internal.String() + } + } + b.auditLog.Response = Response{ + ResponseCode: responseCode, + SentAt: sentAt, + } + return b +} + +func (b *logBuilder) formatLogString(ctx context.Context) string { + auditLog, err := json.Marshal(&b.auditLog) + if err != nil { + logger.Warningf(ctx, "Failed to marshal audit log to protobuf with err: %v", err) + } + return fmt.Sprintf("Recording request: [%s]", auditLog) +} + +func (b *logBuilder) Log(ctx context.Context) { + if b.readOnly { + logger.Warningf(ctx, "Attempting to record audit log for request: [%+v] more than once. Aborting.", b.auditLog.Request) + } + defer func() { + b.readOnly = true + }() + logger.Info(ctx, b.formatLogString(ctx)) +} + +func NewLogBuilder() LogBuilder { + return &logBuilder{} +} diff --git a/pkg/audit/log_test.go b/pkg/audit/log_test.go new file mode 100644 index 0000000000..533fd47566 --- /dev/null +++ b/pkg/audit/log_test.go @@ -0,0 +1,35 @@ +package audit + +import ( + "context" + "testing" + "time" + + "github.com/lyft/flyteadmin/pkg/common" + "github.com/lyft/flyteadmin/pkg/errors" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" +) + +func TestLogBuilderLog(t *testing.T) { + ctx := context.Background() + ctx = context.WithValue(ctx, common.PrincipalContextKey, "prince") + tokenIssuedAt := time.Date(2020, time.January, 5, 10, 15, 0, 0, time.UTC) + requestedAt := time.Date(2020, time.January, 5, 10, 30, 0, 0, time.UTC) + sentAt := time.Date(2020, time.January, 5, 10, 31, 0, 0, time.UTC) + err := errors.NewFlyteAdminError(codes.AlreadyExists, "womp womp") + ctx = context.WithValue(ctx, common.AuditFieldsContextKey, AuthenticatedClientMeta{ + ClientIds: []string{"12345"}, + TokenIssuedAt: tokenIssuedAt, + ClientIP: "192.0.2.1:25", + }) + builder := NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "my_method", map[string]string{ + "my": "params", + }, ReadWrite, requestedAt).WithResponse(sentAt, err) + assert.EqualValues(t, "Recording request: [{\"Principal\":{\"Subject\":\"prince\",\"ClientID\":\"12345\","+ + "\"TokenIssuedAt\":\"2020-01-05T10:15:00Z\"},\"Client\":{\"ClientIP\":\"192.0.2.1:25\"},\"Request\":"+ + "{\"Method\":\"my_method\",\"Parameters\":{\"my\":\"params\"},\"Mode\":1,\"ReceivedAt\":"+ + "\"2020-01-05T10:30:00Z\"},\"Response\":{\"ResponseCode\":\"AlreadyExists\",\"SentAt\":"+ + "\"2020-01-05T10:31:00Z\"}}]", builder.(*logBuilder).formatLogString(context.TODO())) +} diff --git a/pkg/audit/message.go b/pkg/audit/message.go new file mode 100644 index 0000000000..733264acca --- /dev/null +++ b/pkg/audit/message.go @@ -0,0 +1,55 @@ +package audit + +import "time" + +type Principal struct { + // Identifies authenticated end-user + Subject string + + // The client that initiated the auth flow. + ClientID string + + TokenIssuedAt time.Time +} + +type Client struct { + ClientIP string +} + +type AccessMode int + +const ( + ReadOnly AccessMode = iota + ReadWrite +) + +// Details about a specific request issued by a user. +type Request struct { + // Service method endpoint e.g. GetWorkflowExecution + Method string + + // Includes parameters submitted in the request. + Parameters map[string]string + + Mode AccessMode + + ReceivedAt time.Time +} + +// Summary of service response details. +type Response struct { + // e.g. gRPC status code + ResponseCode string + + SentAt time.Time +} + +type Message struct { + Principal Principal + + Client Client + + Request Request + + Response Response +} diff --git a/pkg/audit/util.go b/pkg/audit/util.go new file mode 100644 index 0000000000..959898227f --- /dev/null +++ b/pkg/audit/util.go @@ -0,0 +1,93 @@ +package audit + +import ( + "fmt" + + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +) + +type requestParameters = map[string]string + +const ( + Project = "project" + Domain = "domain" + Name = "name" + Version = "version" + NodeID = "node_id" + RetryAttempt = "retry_attempt" + ResourceType = "resource" + + TaskProject = "task_project" + TaskDomain = "task_domain" + TaskName = "task_name" + TaskVersion = "task_version" +) + +func ParametersFromIdentifier(identifier *core.Identifier) requestParameters { + if identifier == nil { + return requestParameters{} + } + return requestParameters{ + Project: identifier.Project, + Domain: identifier.Domain, + Name: identifier.Name, + Version: identifier.Version, + } +} + +func ParametersFromNamedEntityIdentifier(identifier *admin.NamedEntityIdentifier) requestParameters { + if identifier == nil { + return requestParameters{} + } + return requestParameters{ + Project: identifier.Project, + Domain: identifier.Domain, + Name: identifier.Name, + } +} + +func ParametersFromNamedEntityIdentifierAndResource(identifier *admin.NamedEntityIdentifier, resourceType core.ResourceType) requestParameters { + if identifier == nil { + return requestParameters{} + } + parameters := ParametersFromNamedEntityIdentifier(identifier) + parameters[ResourceType] = resourceType.String() + return parameters +} + +func ParametersFromExecutionIdentifier(identifier *core.WorkflowExecutionIdentifier) requestParameters { + if identifier == nil { + return requestParameters{} + } + return requestParameters{ + Project: identifier.Project, + Domain: identifier.Domain, + Name: identifier.Name, + } +} + +func ParametersFromNodeExecutionIdentifier(identifier *core.NodeExecutionIdentifier) requestParameters { + if identifier == nil || identifier.ExecutionId == nil { + return requestParameters{} + } + return requestParameters{ + Project: identifier.ExecutionId.Project, + Domain: identifier.ExecutionId.Domain, + Name: identifier.ExecutionId.Name, + NodeID: identifier.NodeId, + } +} + +func ParametersFromTaskExecutionIdentifier(identifier *core.TaskExecutionIdentifier) requestParameters { + if identifier == nil || identifier.NodeExecutionId == nil || identifier.TaskId == nil { + return requestParameters{} + } + params := ParametersFromNodeExecutionIdentifier(identifier.NodeExecutionId) + params[RetryAttempt] = fmt.Sprint(identifier.RetryAttempt) + params[TaskProject] = identifier.TaskId.Project + params[TaskDomain] = identifier.TaskId.Domain + params[TaskName] = identifier.TaskId.Name + params[TaskVersion] = identifier.TaskId.Version + return params +} diff --git a/pkg/audit/util_test.go b/pkg/audit/util_test.go new file mode 100644 index 0000000000..ed1256a274 --- /dev/null +++ b/pkg/audit/util_test.go @@ -0,0 +1,106 @@ +package audit + +import ( + "testing" + + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" +) + +func TestParametersFromIdentifier(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + "version": "123", + }, ParametersFromIdentifier(&core.Identifier{ + Project: "proj", + Domain: "development", + Name: "foo", + Version: "123", + })) +} + +func TestParametersFromNamedEntityIdentifier(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + }, ParametersFromNamedEntityIdentifier(&admin.NamedEntityIdentifier{ + Project: "proj", + Domain: "development", + Name: "foo", + })) +} + +func TestParametersFromNamedEntityIdentifierAndResource(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + "resource": "LAUNCH_PLAN", + }, ParametersFromNamedEntityIdentifierAndResource(&admin.NamedEntityIdentifier{ + Project: "proj", + Domain: "development", + Name: "foo", + }, core.ResourceType_LAUNCH_PLAN)) +} + +func TestParametersFromExecutionIdentifier(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + }, ParametersFromExecutionIdentifier(&core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "development", + Name: "foo", + })) +} + +func TestParametersFromNodeExecutionIdentifier(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + "node_id": "nodey", + }, ParametersFromNodeExecutionIdentifier(&core.NodeExecutionIdentifier{ + NodeId: "nodey", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "development", + Name: "foo", + }, + })) +} + +func TestParametersFromTaskExecutionIdentifier(t *testing.T) { + assert.EqualValues(t, map[string]string{ + "project": "proj", + "domain": "development", + "name": "foo", + "node_id": "nodey", + "retry_attempt": "1", + "task_project": "proj2", + "task_domain": "production", + "task_name": "bar", + "task_version": "version", + }, ParametersFromTaskExecutionIdentifier(&core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + Project: "proj2", + Domain: "production", + Name: "bar", + Version: "version", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "nodey", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "development", + Name: "foo", + }, + }, + RetryAttempt: 1, + })) +} diff --git a/pkg/auth/handlers.go b/pkg/auth/handlers.go index b021c6b662..3ea1092ff5 100644 --- a/pkg/auth/handlers.go +++ b/pkg/auth/handlers.go @@ -5,6 +5,11 @@ import ( "encoding/json" "fmt" "net/http" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" + "github.com/lyft/flyteadmin/pkg/common" + "google.golang.org/grpc/peer" "github.com/grpc-ecosystem/go-grpc-middleware/util/metautils" @@ -122,7 +127,7 @@ func GetCallbackHandler(ctx context.Context, authContext interfaces.Authenticati func AuthenticationLoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // Invoke 'handler' to use your gRPC server implementation and get // the response. - logger.Debugf(ctx, "gRPC server info in logging interceptor email %s method %s\n", ctx.Value(PrincipalContextKey), info.FullMethod) + logger.Debugf(ctx, "gRPC server info in logging interceptor email %s method %s\n", ctx.Value(common.PrincipalContextKey), info.FullMethod) return handler(ctx, req) } @@ -175,9 +180,9 @@ func GetAuthenticationInterceptor(authContext interfaces.AuthenticationContext) return ctx, status.Errorf(codes.Unauthenticated, "no email or empty email found") } } - if token != nil { newCtx := WithUserEmail(context.WithValue(ctx, bearerTokenContextKey, token), token.Subject) + newCtx = WithAuditFields(newCtx, token.Audience, token.IssuedAt) return newCtx, nil } return ctx, nil @@ -185,7 +190,20 @@ func GetAuthenticationInterceptor(authContext interfaces.AuthenticationContext) } func WithUserEmail(ctx context.Context, email string) context.Context { - return context.WithValue(ctx, PrincipalContextKey, email) + return context.WithValue(ctx, common.PrincipalContextKey, email) +} + +func WithAuditFields(ctx context.Context, clientIds []string, tokenIssuedAt time.Time) context.Context { + var clientIP string + peer, ok := peer.FromContext(ctx) + if ok { + clientIP = peer.Addr.String() + } + return context.WithValue(ctx, common.AuditFieldsContextKey, audit.AuthenticatedClientMeta{ + ClientIds: clientIds, + TokenIssuedAt: tokenIssuedAt, + ClientIP: clientIP, + }) } // This is effectively middleware for the grpc gateway, it allows us to modify the translation between HTTP request diff --git a/pkg/auth/handlers_test.go b/pkg/auth/handlers_test.go index abba3c4985..8681f8a12c 100644 --- a/pkg/auth/handlers_test.go +++ b/pkg/auth/handlers_test.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/lyft/flyteadmin/pkg/auth/config" + "github.com/lyft/flyteadmin/pkg/common" "github.com/lyft/flyteadmin/pkg/auth/interfaces/mocks" "github.com/stretchr/testify/assert" @@ -18,7 +19,7 @@ import ( func TestWithUserEmail(t *testing.T) { ctx := WithUserEmail(context.Background(), "abc") - assert.Equal(t, "abc", ctx.Value(PrincipalContextKey)) + assert.Equal(t, "abc", ctx.Value(common.PrincipalContextKey)) } func TestGetLoginHandler(t *testing.T) { diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 30ae816900..2fe632590e 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -4,3 +4,8 @@ import "github.com/lyft/flytestdlib/contextutils" var RuntimeTypeKey = contextutils.Key("runtime_type") var RuntimeVersionKey = contextutils.Key("runtime_version") + +const ( + AuditFieldsContextKey contextutils.Key = "audit_fields" + PrincipalContextKey contextutils.Key = "principal" +) diff --git a/pkg/config/config.go b/pkg/config/config.go index 02b0f49b7d..8809c80ee7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -22,10 +22,11 @@ type ServerConfig struct { } type ServerSecurityOptions struct { - Secure bool `json:"secure"` - Ssl SslOptions `json:"ssl"` - UseAuth bool `json:"useAuth"` - Oauth config2.OAuthOptions `json:"oauth"` + Secure bool `json:"secure"` + Ssl SslOptions `json:"ssl"` + UseAuth bool `json:"useAuth"` + Oauth config2.OAuthOptions `json:"oauth"` + AuditAccess bool `json:"auditAccess"` // These options are here to allow deployments where the Flyte UI (Console) is served from a different domain/port. // Note that CORS only applies to Admin's API endpoints. The health check endpoint for instance is unaffected. diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 9a3e73d318..5d8ea4fca6 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -6,13 +6,10 @@ import ( "strconv" "time" - "github.com/lyft/flytestdlib/contextutils" - - "github.com/lyft/flyteadmin/pkg/auth" - "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" dataInterfaces "github.com/lyft/flyteadmin/pkg/data/interfaces" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils" "github.com/prometheus/client_golang/prometheus" @@ -92,7 +89,7 @@ func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifi // Returns the unique string which identifies the authenticated end user (if any). func getUser(ctx context.Context) string { - principalContextUser := ctx.Value(auth.PrincipalContextKey) + principalContextUser := ctx.Value(common.PrincipalContextKey) if principalContextUser != nil { return fmt.Sprintf(principalContextKeyFormat, principalContextUser) } diff --git a/pkg/manager/impl/project_attributes_manager.go b/pkg/manager/impl/project_attributes_manager.go index f4408c3559..e035af9e6a 100644 --- a/pkg/manager/impl/project_attributes_manager.go +++ b/pkg/manager/impl/project_attributes_manager.go @@ -3,6 +3,8 @@ package impl import ( "context" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteadmin/pkg/manager/impl/validation" "github.com/lyft/flyteadmin/pkg/repositories/transformers" @@ -62,6 +64,7 @@ func (m *ProjectAttributesManager) DeleteProjectAttributes(ctx context.Context, if err := m.db.ProjectAttributesRepo().Delete(ctx, request.Project, request.ResourceType.String()); err != nil { return nil, err } + logger.Infof(ctx, "Deleted project attributes for: %s (%s)", request.Project, request.ResourceType.String()) return &admin.ProjectAttributesDeleteResponse{}, nil } diff --git a/pkg/manager/impl/project_domain_attributes_manager.go b/pkg/manager/impl/project_domain_attributes_manager.go index f35cb1ff25..45fe906b74 100644 --- a/pkg/manager/impl/project_domain_attributes_manager.go +++ b/pkg/manager/impl/project_domain_attributes_manager.go @@ -3,6 +3,8 @@ package impl import ( "context" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flyteadmin/pkg/manager/impl/validation" @@ -68,6 +70,8 @@ func (m *ProjectDomainAttributesManager) DeleteProjectDomainAttributes(ctx conte ctx, request.Project, request.Domain, request.ResourceType.String()); err != nil { return nil, err } + logger.Infof(ctx, "Deleted project-domain attributes for: %s-%s (%s)", request.Project, + request.Domain, request.ResourceType.String()) return &admin.ProjectDomainAttributesDeleteResponse{}, nil } diff --git a/pkg/manager/impl/workflow_attributes_manager.go b/pkg/manager/impl/workflow_attributes_manager.go index ac8e5fd3e1..f5a285a7c9 100644 --- a/pkg/manager/impl/workflow_attributes_manager.go +++ b/pkg/manager/impl/workflow_attributes_manager.go @@ -3,6 +3,8 @@ package impl import ( "context" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteadmin/pkg/manager/impl/validation" "github.com/lyft/flyteadmin/pkg/repositories/transformers" @@ -65,6 +67,8 @@ func (m *WorkflowAttributesManager) DeleteWorkflowAttributes(ctx context.Context ctx, request.Project, request.Domain, request.Workflow, request.ResourceType.String()); err != nil { return nil, err } + logger.Infof(ctx, "Deleted workflow attributes for: %s-%s-%s (%s)", request.Project, + request.Domain, request.Workflow, request.ResourceType.String()) return &admin.WorkflowAttributesDeleteResponse{}, nil } diff --git a/pkg/rpc/adminservice/attributes.go b/pkg/rpc/adminservice/attributes.go index 0548792a22..c374144158 100644 --- a/pkg/rpc/adminservice/attributes.go +++ b/pkg/rpc/adminservice/attributes.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -12,6 +15,7 @@ import ( func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesUpdateRequest) ( *admin.WorkflowAttributesUpdateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -20,6 +24,16 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad m.Metrics.workflowAttributesEndpointMetrics.update.Time(func() { response, err = m.WorkflowAttributesManager.UpdateWorkflowAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "UpdateWorkflowAttributes", + map[string]string{ + audit.Project: request.Attributes.Project, + audit.Domain: request.Attributes.Domain, + audit.Name: request.Attributes.Workflow, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.update) } @@ -30,6 +44,7 @@ func (m *AdminService) UpdateWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesGetRequest) ( *admin.WorkflowAttributesGetResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -38,6 +53,16 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin m.Metrics.workflowAttributesEndpointMetrics.get.Time(func() { response, err = m.WorkflowAttributesManager.GetWorkflowAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetWorkflowAttributes", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + audit.Name: request.Workflow, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.get) } @@ -48,6 +73,7 @@ func (m *AdminService) GetWorkflowAttributes(ctx context.Context, request *admin func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *admin.WorkflowAttributesDeleteRequest) ( *admin.WorkflowAttributesDeleteResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -56,6 +82,16 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad m.Metrics.workflowAttributesEndpointMetrics.delete.Time(func() { response, err = m.WorkflowAttributesManager.DeleteWorkflowAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "DeleteWorkflowAttributes", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + audit.Name: request.Workflow, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.delete) } @@ -66,6 +102,7 @@ func (m *AdminService) DeleteWorkflowAttributes(ctx context.Context, request *ad func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesUpdateRequest) ( *admin.ProjectDomainAttributesUpdateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -74,6 +111,15 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques m.Metrics.projectDomainAttributesEndpointMetrics.update.Time(func() { response, err = m.ProjectDomainAttributesManager.UpdateProjectDomainAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "UpdateProjectDomainAttributes", + map[string]string{ + audit.Project: request.Attributes.Project, + audit.Domain: request.Attributes.Domain, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectDomainAttributesEndpointMetrics.update) } @@ -84,6 +130,7 @@ func (m *AdminService) UpdateProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesGetRequest) ( *admin.ProjectDomainAttributesGetResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -92,6 +139,15 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request * m.Metrics.workflowAttributesEndpointMetrics.get.Time(func() { response, err = m.ProjectDomainAttributesManager.GetProjectDomainAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetProjectDomainAttributes", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.get) } @@ -102,6 +158,7 @@ func (m *AdminService) GetProjectDomainAttributes(ctx context.Context, request * func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, request *admin.ProjectDomainAttributesDeleteRequest) ( *admin.ProjectDomainAttributesDeleteResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -110,6 +167,15 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques m.Metrics.workflowAttributesEndpointMetrics.delete.Time(func() { response, err = m.ProjectDomainAttributesManager.DeleteProjectDomainAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "DeleteProjectDomainAttributes", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.delete) } @@ -120,6 +186,7 @@ func (m *AdminService) DeleteProjectDomainAttributes(ctx context.Context, reques func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *admin.ProjectAttributesUpdateRequest) ( *admin.ProjectAttributesUpdateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -131,6 +198,14 @@ func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *adm if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectAttributesEndpointMetrics.update) } + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "UpdateProjectAttributes", + map[string]string{ + audit.Project: request.Attributes.Project, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) return response, nil } @@ -138,6 +213,7 @@ func (m *AdminService) UpdateProjectAttributes(ctx context.Context, request *adm func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin.ProjectAttributesGetRequest) ( *admin.ProjectAttributesGetResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -146,6 +222,14 @@ func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin. m.Metrics.workflowAttributesEndpointMetrics.get.Time(func() { response, err = m.ProjectAttributesManager.GetProjectAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetProjectAttributes", + map[string]string{ + audit.Project: request.Project, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.get) } @@ -156,6 +240,7 @@ func (m *AdminService) GetProjectAttributes(ctx context.Context, request *admin. func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *admin.ProjectAttributesDeleteRequest) ( *admin.ProjectAttributesDeleteResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -164,6 +249,14 @@ func (m *AdminService) DeleteProjectAttributes(ctx context.Context, request *adm m.Metrics.workflowAttributesEndpointMetrics.delete.Time(func() { response, err = m.ProjectAttributesManager.DeleteProjectAttributes(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "DeleteProjectAttributes", + map[string]string{ + audit.Project: request.Project, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowAttributesEndpointMetrics.delete) } diff --git a/pkg/rpc/adminservice/execution.go b/pkg/rpc/adminservice/execution.go index 055112347c..0a9d408ae8 100644 --- a/pkg/rpc/adminservice/execution.go +++ b/pkg/rpc/adminservice/execution.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/lyft/flyteadmin/pkg/audit" + "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "google.golang.org/grpc/codes" @@ -22,6 +24,16 @@ func (m *AdminService) CreateExecution( m.Metrics.executionEndpointMetrics.create.Time(func() { response, err = m.ExecutionManager.CreateExecution(ctx, *request, requestedAt) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ExecutionCreateRequest", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + audit.Name: request.Name, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.create) } @@ -41,6 +53,12 @@ func (m *AdminService) RelaunchExecution( m.Metrics.executionEndpointMetrics.relaunch.Time(func() { response, err = m.ExecutionManager.RelaunchExecution(ctx, *request, requestedAt) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ExecutionCreateRequest", + audit.ParametersFromExecutionIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.relaunch) } @@ -51,6 +69,7 @@ func (m *AdminService) RelaunchExecution( func (m *AdminService) CreateWorkflowEvent( ctx context.Context, request *admin.WorkflowExecutionEventRequest) (*admin.WorkflowExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -59,16 +78,24 @@ func (m *AdminService) CreateWorkflowEvent( m.Metrics.executionEndpointMetrics.createEvent.Time(func() { response, err = m.ExecutionManager.CreateWorkflowEvent(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateWorkflowEvent", + audit.ParametersFromExecutionIdentifier(request.Event.ExecutionId), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.createEvent) } m.Metrics.executionEndpointMetrics.createEvent.Success() + return response, nil } func (m *AdminService) GetExecution( ctx context.Context, request *admin.WorkflowExecutionGetRequest) (*admin.Execution, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -77,6 +104,12 @@ func (m *AdminService) GetExecution( m.Metrics.executionEndpointMetrics.get.Time(func() { response, err = m.ExecutionManager.GetExecution(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetExecution", + audit.ParametersFromExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.get) } @@ -87,6 +120,7 @@ func (m *AdminService) GetExecution( func (m *AdminService) GetExecutionData( ctx context.Context, request *admin.WorkflowExecutionGetDataRequest) (*admin.WorkflowExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -95,6 +129,12 @@ func (m *AdminService) GetExecutionData( m.Metrics.executionEndpointMetrics.get.Time(func() { response, err = m.ExecutionManager.GetExecutionData(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetExecutionData", + audit.ParametersFromExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.getData) } @@ -105,6 +145,7 @@ func (m *AdminService) GetExecutionData( func (m *AdminService) ListExecutions( ctx context.Context, request *admin.ResourceListRequest) (*admin.ExecutionList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -113,6 +154,12 @@ func (m *AdminService) ListExecutions( m.Metrics.executionEndpointMetrics.list.Time(func() { response, err = m.ExecutionManager.ListExecutions(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListExecutions", + audit.ParametersFromNamedEntityIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.list) } @@ -123,6 +170,7 @@ func (m *AdminService) ListExecutions( func (m *AdminService) TerminateExecution( ctx context.Context, request *admin.ExecutionTerminateRequest) (*admin.ExecutionTerminateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -131,6 +179,12 @@ func (m *AdminService) TerminateExecution( m.Metrics.executionEndpointMetrics.terminate.Time(func() { response, err = m.ExecutionManager.TerminateExecution(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "TerminateExecution", + audit.ParametersFromExecutionIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.executionEndpointMetrics.terminate) } diff --git a/pkg/rpc/adminservice/launch_plan.go b/pkg/rpc/adminservice/launch_plan.go index c983356f2f..a86455f2da 100644 --- a/pkg/rpc/adminservice/launch_plan.go +++ b/pkg/rpc/adminservice/launch_plan.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" @@ -15,6 +18,7 @@ import ( func (m *AdminService) CreateLaunchPlan( ctx context.Context, request *admin.LaunchPlanCreateRequest) (*admin.LaunchPlanCreateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -23,6 +27,12 @@ func (m *AdminService) CreateLaunchPlan( m.Metrics.launchPlanEndpointMetrics.create.Time(func() { response, err = m.LaunchPlanManager.CreateLaunchPlan(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateLaunchPlan", + audit.ParametersFromIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.create) } @@ -32,6 +42,7 @@ func (m *AdminService) CreateLaunchPlan( func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectGetRequest) (*admin.LaunchPlan, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -46,6 +57,12 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG m.Metrics.launchPlanEndpointMetrics.get.Time(func() { response, err = m.LaunchPlanManager.GetLaunchPlan(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetLaunchPlan", + audit.ParametersFromIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.get) } @@ -56,6 +73,7 @@ func (m *AdminService) GetLaunchPlan(ctx context.Context, request *admin.ObjectG func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.ActiveLaunchPlanRequest) (*admin.LaunchPlan, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -64,6 +82,12 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A m.Metrics.launchPlanEndpointMetrics.getActive.Time(func() { response, err = m.LaunchPlanManager.GetActiveLaunchPlan(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetActiveLaunchPlan", + audit.ParametersFromNamedEntityIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.getActive) } @@ -74,6 +98,7 @@ func (m *AdminService) GetActiveLaunchPlan(ctx context.Context, request *admin.A func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.LaunchPlanUpdateRequest) ( *admin.LaunchPlanUpdateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -88,6 +113,12 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun m.Metrics.launchPlanEndpointMetrics.update.Time(func() { response, err = m.LaunchPlanManager.UpdateLaunchPlan(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "UpdateLaunchPlan", + audit.ParametersFromIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.update) } @@ -98,6 +129,7 @@ func (m *AdminService) UpdateLaunchPlan(ctx context.Context, request *admin.Laun func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.ResourceListRequest) ( *admin.LaunchPlanList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -106,6 +138,12 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou m.Metrics.launchPlanEndpointMetrics.list.Time(func() { response, err = m.LaunchPlanManager.ListLaunchPlans(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListLaunchPlans", + audit.ParametersFromNamedEntityIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.list) } @@ -117,6 +155,7 @@ func (m *AdminService) ListLaunchPlans(ctx context.Context, request *admin.Resou func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin.ActiveLaunchPlanListRequest) ( *admin.LaunchPlanList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -125,6 +164,15 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin m.Metrics.launchPlanEndpointMetrics.listActive.Time(func() { response, err = m.LaunchPlanManager.ListActiveLaunchPlans(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListActiveLaunchPlans", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.listActive) } @@ -136,6 +184,7 @@ func (m *AdminService) ListActiveLaunchPlans(ctx context.Context, request *admin func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Empty request. Please rephrase.") } @@ -145,6 +194,15 @@ func (m *AdminService) ListLaunchPlanIds(ctx context.Context, request *admin.Nam m.Metrics.launchPlanEndpointMetrics.listIds.Time(func() { response, err = m.LaunchPlanManager.ListLaunchPlanIds(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListLaunchPlanIds", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.launchPlanEndpointMetrics.listIds) } diff --git a/pkg/rpc/adminservice/named_entity.go b/pkg/rpc/adminservice/named_entity.go index 9737b52a96..b5de71e380 100644 --- a/pkg/rpc/adminservice/named_entity.go +++ b/pkg/rpc/adminservice/named_entity.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -11,6 +14,7 @@ import ( func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedEntityGetRequest) (*admin.NamedEntity, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -20,6 +24,12 @@ func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedE m.Metrics.namedEntityEndpointMetrics.get.Time(func() { response, err = m.NamedEntityManager.GetNamedEntity(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetNamedEntity", + audit.ParametersFromNamedEntityIdentifierAndResource(request.Id, request.ResourceType), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.get) } @@ -31,6 +41,7 @@ func (m *AdminService) GetNamedEntity(ctx context.Context, request *admin.NamedE func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.NamedEntityUpdateRequest) ( *admin.NamedEntityUpdateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -40,6 +51,12 @@ func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.Nam m.Metrics.namedEntityEndpointMetrics.update.Time(func() { response, err = m.NamedEntityManager.UpdateNamedEntity(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "UpdateNamedEntity", + audit.ParametersFromNamedEntityIdentifierAndResource(request.Id, request.ResourceType), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.update) } @@ -50,6 +67,7 @@ func (m *AdminService) UpdateNamedEntity(ctx context.Context, request *admin.Nam func (m *AdminService) ListNamedEntities(ctx context.Context, request *admin.NamedEntityListRequest) ( *admin.NamedEntityList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -59,6 +77,16 @@ func (m *AdminService) ListNamedEntities(ctx context.Context, request *admin.Nam m.Metrics.namedEntityEndpointMetrics.list.Time(func() { response, err = m.NamedEntityManager.ListNamedEntities(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListNamedEntities", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + audit.ResourceType: request.ResourceType.String(), + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.namedEntityEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/node_execution.go b/pkg/rpc/adminservice/node_execution.go index 642dd5b889..6def7f8684 100644 --- a/pkg/rpc/adminservice/node_execution.go +++ b/pkg/rpc/adminservice/node_execution.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flytestdlib/logger" @@ -15,6 +18,7 @@ import ( func (m *AdminService) CreateNodeEvent( ctx context.Context, request *admin.NodeExecutionEventRequest) (*admin.NodeExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -23,6 +27,12 @@ func (m *AdminService) CreateNodeEvent( m.Metrics.nodeExecutionEndpointMetrics.createEvent.Time(func() { response, err = m.NodeExecutionManager.CreateNodeEvent(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateNodeEvent", + audit.ParametersFromNodeExecutionIdentifier(request.Event.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.createEvent) } @@ -33,6 +43,7 @@ func (m *AdminService) CreateNodeEvent( func (m *AdminService) GetNodeExecution( ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -41,6 +52,12 @@ func (m *AdminService) GetNodeExecution( m.Metrics.nodeExecutionEndpointMetrics.get.Time(func() { response, err = m.NodeExecutionManager.GetNodeExecution(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetNodeExecution", + audit.ParametersFromNodeExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.get) } @@ -51,6 +68,7 @@ func (m *AdminService) GetNodeExecution( func (m *AdminService) ListNodeExecutions( ctx context.Context, request *admin.NodeExecutionListRequest) (*admin.NodeExecutionList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -59,6 +77,12 @@ func (m *AdminService) ListNodeExecutions( m.Metrics.nodeExecutionEndpointMetrics.list.Time(func() { response, err = m.NodeExecutionManager.ListNodeExecutions(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListNodeExecutions", + audit.ParametersFromExecutionIdentifier(request.WorkflowExecutionId), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.list) } @@ -69,6 +93,7 @@ func (m *AdminService) ListNodeExecutions( func (m *AdminService) ListNodeExecutionsForTask( ctx context.Context, request *admin.NodeExecutionForTaskListRequest) (*admin.NodeExecutionList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -84,6 +109,12 @@ func (m *AdminService) ListNodeExecutionsForTask( m.Metrics.nodeExecutionEndpointMetrics.listChildren.Time(func() { response, err = m.NodeExecutionManager.ListNodeExecutionsForTask(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListNodeExecutionsForTask", + audit.ParametersFromTaskExecutionIdentifier(request.TaskExecutionId), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.listChildren) } @@ -94,6 +125,7 @@ func (m *AdminService) ListNodeExecutionsForTask( func (m *AdminService) GetNodeExecutionData( ctx context.Context, request *admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -102,6 +134,12 @@ func (m *AdminService) GetNodeExecutionData( m.Metrics.nodeExecutionEndpointMetrics.getData.Time(func() { response, err = m.NodeExecutionManager.GetNodeExecutionData(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetNodeExecutionData", + audit.ParametersFromNodeExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.nodeExecutionEndpointMetrics.getData) } diff --git a/pkg/rpc/adminservice/project.go b/pkg/rpc/adminservice/project.go index 1e333d9bc3..044dccb2f0 100644 --- a/pkg/rpc/adminservice/project.go +++ b/pkg/rpc/adminservice/project.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteadmin/pkg/rpc/adminservice/util" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -12,6 +15,7 @@ import ( func (m *AdminService) RegisterProject(ctx context.Context, request *admin.ProjectRegisterRequest) ( *admin.ProjectRegisterResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -20,6 +24,14 @@ func (m *AdminService) RegisterProject(ctx context.Context, request *admin.Proje m.Metrics.projectEndpointMetrics.register.Time(func() { response, err = m.ProjectManager.CreateProject(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "RegisterProject", + map[string]string{ + audit.Project: request.Project.Id, + }, + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectEndpointMetrics.register) } @@ -29,6 +41,7 @@ func (m *AdminService) RegisterProject(ctx context.Context, request *admin.Proje func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectListRequest) (*admin.Projects, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -37,6 +50,12 @@ func (m *AdminService) ListProjects(ctx context.Context, request *admin.ProjectL m.Metrics.projectEndpointMetrics.list.Time(func() { response, err = m.ProjectManager.ListProjects(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListProjects", + map[string]string{}, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.projectEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/task.go b/pkg/rpc/adminservice/task.go index 1afe2e214a..653935f975 100644 --- a/pkg/rpc/adminservice/task.go +++ b/pkg/rpc/adminservice/task.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" @@ -16,7 +19,7 @@ func (m *AdminService) CreateTask( ctx context.Context, request *admin.TaskCreateRequest) (*admin.TaskCreateResponse, error) { defer m.interceptPanic(ctx, request) - + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -25,6 +28,12 @@ func (m *AdminService) CreateTask( m.Metrics.taskEndpointMetrics.create.Time(func() { response, err = m.TaskManager.CreateTask(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateTask", + audit.ParametersFromIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.create) } @@ -34,6 +43,7 @@ func (m *AdminService) CreateTask( func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Task, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -48,6 +58,12 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ m.Metrics.taskEndpointMetrics.get.Time(func() { response, err = m.TaskManager.GetTask(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetTask", + audit.ParametersFromIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.get) } @@ -58,6 +74,7 @@ func (m *AdminService) GetTask(ctx context.Context, request *admin.ObjectGetRequ func (m *AdminService) ListTaskIds( ctx context.Context, request *admin.NamedEntityIdentifierListRequest) (*admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -66,6 +83,15 @@ func (m *AdminService) ListTaskIds( m.Metrics.taskEndpointMetrics.listIds.Time(func() { response, err = m.TaskManager.ListUniqueTaskIdentifiers(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListTaskIds", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.listIds) } @@ -76,6 +102,7 @@ func (m *AdminService) ListTaskIds( func (m *AdminService) ListTasks(ctx context.Context, request *admin.ResourceListRequest) (*admin.TaskList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -84,6 +111,12 @@ func (m *AdminService) ListTasks(ctx context.Context, request *admin.ResourceLis m.Metrics.taskEndpointMetrics.list.Time(func() { response, err = m.TaskManager.ListTasks(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListTasks", + audit.ParametersFromNamedEntityIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskEndpointMetrics.list) } diff --git a/pkg/rpc/adminservice/task_execution.go b/pkg/rpc/adminservice/task_execution.go index 16b838b8c3..184fab00bf 100644 --- a/pkg/rpc/adminservice/task_execution.go +++ b/pkg/rpc/adminservice/task_execution.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flytestdlib/logger" @@ -16,7 +19,7 @@ import ( func (m *AdminService) CreateTaskEvent( ctx context.Context, request *admin.TaskExecutionEventRequest) (*admin.TaskExecutionEventResponse, error) { defer m.interceptPanic(ctx, request) - + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -30,6 +33,16 @@ func (m *AdminService) CreateTaskEvent( m.Metrics.taskExecutionEndpointMetrics.createEvent.Time(func() { response, err = m.TaskExecutionManager.CreateTaskExecutionEvent(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateTask", + audit.ParametersFromTaskExecutionIdentifier(&core.TaskExecutionIdentifier{ + TaskId: request.Event.TaskId, + NodeExecutionId: request.Event.ParentNodeExecutionId, + RetryAttempt: request.Event.RetryAttempt, + }), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.createEvent) } @@ -40,6 +53,7 @@ func (m *AdminService) CreateTaskEvent( func (m *AdminService) GetTaskExecution( ctx context.Context, request *admin.TaskExecutionGetRequest) (*admin.TaskExecution, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -58,6 +72,12 @@ func (m *AdminService) GetTaskExecution( m.Metrics.taskExecutionEndpointMetrics.get.Time(func() { response, err = m.TaskExecutionManager.GetTaskExecution(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetTaskExecution", + audit.ParametersFromTaskExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.get) } @@ -68,7 +88,7 @@ func (m *AdminService) GetTaskExecution( func (m *AdminService) ListTaskExecutions( ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { defer m.interceptPanic(ctx, request) - + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Nil request") } @@ -81,6 +101,12 @@ func (m *AdminService) ListTaskExecutions( m.Metrics.taskExecutionEndpointMetrics.list.Time(func() { response, err = m.TaskExecutionManager.ListTaskExecutions(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListTaskExecutions", + audit.ParametersFromNodeExecutionIdentifier(request.NodeExecutionId), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.list) } @@ -91,6 +117,7 @@ func (m *AdminService) ListTaskExecutions( func (m *AdminService) GetTaskExecutionData( ctx context.Context, request *admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -105,6 +132,12 @@ func (m *AdminService) GetTaskExecutionData( m.Metrics.taskExecutionEndpointMetrics.getData.Time(func() { response, err = m.TaskExecutionManager.GetTaskExecutionData(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetTaskExecutionData", + audit.ParametersFromTaskExecutionIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.taskExecutionEndpointMetrics.getData) } diff --git a/pkg/rpc/adminservice/tests/project_domain_test.go b/pkg/rpc/adminservice/tests/project_domain_test.go index 2a8decd760..11b39df166 100644 --- a/pkg/rpc/adminservice/tests/project_domain_test.go +++ b/pkg/rpc/adminservice/tests/project_domain_test.go @@ -25,7 +25,12 @@ func TestUpdateProjectDomain(t *testing.T) { projectDomainAttributesManager: &mockProjectDomainManager, }) - resp, err := mockServer.UpdateProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesUpdateRequest{}) + resp, err := mockServer.UpdateProjectDomainAttributes(ctx, &admin.ProjectDomainAttributesUpdateRequest{ + Attributes: &admin.ProjectDomainAttributes{ + Project: "project", + Domain: "domain", + }, + }) assert.NotNil(t, resp) assert.NoError(t, err) assert.True(t, updateCalled) diff --git a/pkg/rpc/adminservice/tests/project_test.go b/pkg/rpc/adminservice/tests/project_test.go index 937f3b3174..f302253169 100644 --- a/pkg/rpc/adminservice/tests/project_test.go +++ b/pkg/rpc/adminservice/tests/project_test.go @@ -24,7 +24,11 @@ func TestRegisterProject(t *testing.T) { projectManager: &mockProjectManager, }) - resp, err := mockServer.RegisterProject(ctx, &admin.ProjectRegisterRequest{}) + resp, err := mockServer.RegisterProject(ctx, &admin.ProjectRegisterRequest{ + Project: &admin.Project{ + Id: "project", + }, + }) assert.NotNil(t, resp) assert.NoError(t, err) } diff --git a/pkg/rpc/adminservice/workflow.go b/pkg/rpc/adminservice/workflow.go index d0656a3129..6eed4aaf84 100644 --- a/pkg/rpc/adminservice/workflow.go +++ b/pkg/rpc/adminservice/workflow.go @@ -2,6 +2,9 @@ package adminservice import ( "context" + "time" + + "github.com/lyft/flyteadmin/pkg/audit" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" @@ -17,6 +20,7 @@ func (m *AdminService) CreateWorkflow( ctx context.Context, request *admin.WorkflowCreateRequest) (*admin.WorkflowCreateResponse, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -25,6 +29,12 @@ func (m *AdminService) CreateWorkflow( m.Metrics.workflowEndpointMetrics.create.Time(func() { response, err = m.WorkflowManager.CreateWorkflow(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "CreateTask", + audit.ParametersFromIdentifier(request.Id), + audit.ReadWrite, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.create) } @@ -34,6 +44,7 @@ func (m *AdminService) CreateWorkflow( func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGetRequest) (*admin.Workflow, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -48,6 +59,12 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet m.Metrics.workflowEndpointMetrics.get.Time(func() { response, err = m.WorkflowManager.GetWorkflow(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "GetWorkflow", + audit.ParametersFromIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.get) } @@ -58,7 +75,7 @@ func (m *AdminService) GetWorkflow(ctx context.Context, request *admin.ObjectGet func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.NamedEntityIdentifierListRequest) ( *admin.NamedEntityIdentifierList, error) { defer m.interceptPanic(ctx, request) - + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -68,6 +85,15 @@ func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.Named m.Metrics.workflowEndpointMetrics.listIds.Time(func() { response, err = m.WorkflowManager.ListWorkflowIdentifiers(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListWorkflowIds", + map[string]string{ + audit.Project: request.Project, + audit.Domain: request.Domain, + }, + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.listIds) } @@ -78,6 +104,7 @@ func (m *AdminService) ListWorkflowIds(ctx context.Context, request *admin.Named func (m *AdminService) ListWorkflows(ctx context.Context, request *admin.ResourceListRequest) (*admin.WorkflowList, error) { defer m.interceptPanic(ctx, request) + requestedAt := time.Now() if request == nil { return nil, status.Errorf(codes.InvalidArgument, "Incorrect request, nil requests not allowed") } @@ -86,6 +113,12 @@ func (m *AdminService) ListWorkflows(ctx context.Context, request *admin.Resourc m.Metrics.workflowEndpointMetrics.list.Time(func() { response, err = m.WorkflowManager.ListWorkflows(ctx, *request) }) + audit.NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest( + "ListWorkflows", + audit.ParametersFromNamedEntityIdentifier(request.Id), + audit.ReadOnly, + requestedAt, + ).WithResponse(time.Now(), err).Log(ctx) if err != nil { return nil, util.TransformAndRecordError(err, &m.Metrics.workflowEndpointMetrics.list) }