Skip to content

Commit

Permalink
Add audit logging (flyteorg#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jan 6, 2020
1 parent 1f516ba commit 21d4246
Show file tree
Hide file tree
Showing 27 changed files with 847 additions and 20 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion boilerplate/lyft/golang_test_targets/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ lint: #lints the package for common code smells
# However, that call seem to have some effects (e.g. https://github.com/golang/go/issues/29452) which, for some
# reason, allows the subsequent calls to succeed.
# TODO: Evaluate whether this is still a problem after moving admin dependency system to go modules.
GO111MODULE=off golangci-lint run --exclude deprecated -v || true
GO111MODULE=off golangci-lint run --deadline=5m --exclude deprecated -v

# If code is failing goimports linter, this will fix.
Expand Down
11 changes: 11 additions & 0 deletions pkg/audit/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package audit

import (
"time"
)

type AuthenticatedClientMeta struct {
ClientIds []string
TokenIssuedAt time.Time
ClientIP string
}
96 changes: 96 additions & 0 deletions pkg/audit/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package audit

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flytestdlib/logger"
"google.golang.org/grpc/codes"
)

type LogBuilder interface {
WithAuthenticatedCtx(ctx context.Context) LogBuilder
WithRequest(method string, parameters map[string]string, mode AccessMode, requestedAt time.Time) LogBuilder
WithResponse(sentAt time.Time, err error) LogBuilder
Log(ctx context.Context)
}

type logBuilder struct {
auditLog Message
readOnly bool
}

func (b *logBuilder) WithAuthenticatedCtx(ctx context.Context) LogBuilder {
clientMeta := ctx.Value(common.AuditFieldsContextKey)
switch clientMeta.(type) {
case AuthenticatedClientMeta:
b.auditLog.Principal = Principal{
Subject: ctx.Value(common.PrincipalContextKey).(string),
TokenIssuedAt: clientMeta.(AuthenticatedClientMeta).TokenIssuedAt,
}
if len(clientMeta.(AuthenticatedClientMeta).ClientIds) > 0 {
b.auditLog.Principal.ClientID = clientMeta.(AuthenticatedClientMeta).ClientIds[0]
}
b.auditLog.Client = Client{
ClientIP: clientMeta.(AuthenticatedClientMeta).ClientIP,
}
default:
logger.Warningf(ctx, "Failed to parse authenticated client metadata when creating audit log")
}
return b
}

// TODO: Also look into passing down HTTP verb
func (b *logBuilder) WithRequest(method string, parameters map[string]string, mode AccessMode,
requestedAt time.Time) LogBuilder {
b.auditLog.Request = Request{
Method: method,
Parameters: parameters,
Mode: mode,
ReceivedAt: requestedAt,
}
return b
}

func (b *logBuilder) WithResponse(sentAt time.Time, err error) LogBuilder {
responseCode := codes.OK.String()
if err != nil {
switch err := err.(type) {
case errors.FlyteAdminError:
responseCode = err.(errors.FlyteAdminError).Code().String()
default:
responseCode = codes.Internal.String()
}
}
b.auditLog.Response = Response{
ResponseCode: responseCode,
SentAt: sentAt,
}
return b
}

func (b *logBuilder) formatLogString(ctx context.Context) string {
auditLog, err := json.Marshal(&b.auditLog)
if err != nil {
logger.Warningf(ctx, "Failed to marshal audit log to protobuf with err: %v", err)
}
return fmt.Sprintf("Recording request: [%s]", auditLog)
}

func (b *logBuilder) Log(ctx context.Context) {
if b.readOnly {
logger.Warningf(ctx, "Attempting to record audit log for request: [%+v] more than once. Aborting.", b.auditLog.Request)
}
defer func() {
b.readOnly = true
}()
logger.Info(ctx, b.formatLogString(ctx))
}

func NewLogBuilder() LogBuilder {
return &logBuilder{}
}
35 changes: 35 additions & 0 deletions pkg/audit/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package audit

import (
"context"
"testing"
"time"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
)

func TestLogBuilderLog(t *testing.T) {
ctx := context.Background()
ctx = context.WithValue(ctx, common.PrincipalContextKey, "prince")
tokenIssuedAt := time.Date(2020, time.January, 5, 10, 15, 0, 0, time.UTC)
requestedAt := time.Date(2020, time.January, 5, 10, 30, 0, 0, time.UTC)
sentAt := time.Date(2020, time.January, 5, 10, 31, 0, 0, time.UTC)
err := errors.NewFlyteAdminError(codes.AlreadyExists, "womp womp")
ctx = context.WithValue(ctx, common.AuditFieldsContextKey, AuthenticatedClientMeta{
ClientIds: []string{"12345"},
TokenIssuedAt: tokenIssuedAt,
ClientIP: "192.0.2.1:25",
})
builder := NewLogBuilder().WithAuthenticatedCtx(ctx).WithRequest(
"my_method", map[string]string{
"my": "params",
}, ReadWrite, requestedAt).WithResponse(sentAt, err)
assert.EqualValues(t, "Recording request: [{\"Principal\":{\"Subject\":\"prince\",\"ClientID\":\"12345\","+
"\"TokenIssuedAt\":\"2020-01-05T10:15:00Z\"},\"Client\":{\"ClientIP\":\"192.0.2.1:25\"},\"Request\":"+
"{\"Method\":\"my_method\",\"Parameters\":{\"my\":\"params\"},\"Mode\":1,\"ReceivedAt\":"+
"\"2020-01-05T10:30:00Z\"},\"Response\":{\"ResponseCode\":\"AlreadyExists\",\"SentAt\":"+
"\"2020-01-05T10:31:00Z\"}}]", builder.(*logBuilder).formatLogString(context.TODO()))
}
55 changes: 55 additions & 0 deletions pkg/audit/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package audit

import "time"

type Principal struct {
// Identifies authenticated end-user
Subject string

// The client that initiated the auth flow.
ClientID string

TokenIssuedAt time.Time
}

type Client struct {
ClientIP string
}

type AccessMode int

const (
ReadOnly AccessMode = iota
ReadWrite
)

// Details about a specific request issued by a user.
type Request struct {
// Service method endpoint e.g. GetWorkflowExecution
Method string

// Includes parameters submitted in the request.
Parameters map[string]string

Mode AccessMode

ReceivedAt time.Time
}

// Summary of service response details.
type Response struct {
// e.g. gRPC status code
ResponseCode string

SentAt time.Time
}

type Message struct {
Principal Principal

Client Client

Request Request

Response Response
}
93 changes: 93 additions & 0 deletions pkg/audit/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package audit

import (
"fmt"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
)

type requestParameters = map[string]string

const (
Project = "project"
Domain = "domain"
Name = "name"
Version = "version"
NodeID = "node_id"
RetryAttempt = "retry_attempt"
ResourceType = "resource"

TaskProject = "task_project"
TaskDomain = "task_domain"
TaskName = "task_name"
TaskVersion = "task_version"
)

func ParametersFromIdentifier(identifier *core.Identifier) requestParameters {
if identifier == nil {
return requestParameters{}
}
return requestParameters{
Project: identifier.Project,
Domain: identifier.Domain,
Name: identifier.Name,
Version: identifier.Version,
}
}

func ParametersFromNamedEntityIdentifier(identifier *admin.NamedEntityIdentifier) requestParameters {
if identifier == nil {
return requestParameters{}
}
return requestParameters{
Project: identifier.Project,
Domain: identifier.Domain,
Name: identifier.Name,
}
}

func ParametersFromNamedEntityIdentifierAndResource(identifier *admin.NamedEntityIdentifier, resourceType core.ResourceType) requestParameters {
if identifier == nil {
return requestParameters{}
}
parameters := ParametersFromNamedEntityIdentifier(identifier)
parameters[ResourceType] = resourceType.String()
return parameters
}

func ParametersFromExecutionIdentifier(identifier *core.WorkflowExecutionIdentifier) requestParameters {
if identifier == nil {
return requestParameters{}
}
return requestParameters{
Project: identifier.Project,
Domain: identifier.Domain,
Name: identifier.Name,
}
}

func ParametersFromNodeExecutionIdentifier(identifier *core.NodeExecutionIdentifier) requestParameters {
if identifier == nil || identifier.ExecutionId == nil {
return requestParameters{}
}
return requestParameters{
Project: identifier.ExecutionId.Project,
Domain: identifier.ExecutionId.Domain,
Name: identifier.ExecutionId.Name,
NodeID: identifier.NodeId,
}
}

func ParametersFromTaskExecutionIdentifier(identifier *core.TaskExecutionIdentifier) requestParameters {
if identifier == nil || identifier.NodeExecutionId == nil || identifier.TaskId == nil {
return requestParameters{}
}
params := ParametersFromNodeExecutionIdentifier(identifier.NodeExecutionId)
params[RetryAttempt] = fmt.Sprint(identifier.RetryAttempt)
params[TaskProject] = identifier.TaskId.Project
params[TaskDomain] = identifier.TaskId.Domain
params[TaskName] = identifier.TaskId.Name
params[TaskVersion] = identifier.TaskId.Version
return params
}
Loading

0 comments on commit 21d4246

Please sign in to comment.