Skip to content

Commit

Permalink
chore(tasks): remove old auth code and allow only token auth
Browse files Browse the repository at this point in the history
  • Loading branch information
AlirieGray committed Jul 16, 2019
1 parent c6f66e1 commit 0fe9d10
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 258 deletions.
4 changes: 4 additions & 0 deletions authorizer/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if t.Token == "" {
return nil, influxdb.ErrMissingToken
}

p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions authorizer/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestOnboardingValidation(t *testing.T) {

_, err = ts.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: r.Org.ID,
Token: r.Auth.Token,
Flux: `option task = {
name: "my_task",
every: 1s,
Expand Down Expand Up @@ -202,6 +203,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
check: func(ctx context.Context, svc influxdb.TaskService) error {
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: r.Org.ID,
Token: r.Auth.Token,
Flux: `option task = {
name: "my_task",
every: 1s,
Expand Down
2 changes: 1 addition & 1 deletion http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8528,7 +8528,7 @@ components:
token:
description: The token to use for authenticating this task when it executes queries. If omitted, uses the token associated with the request that creates the task.
type: string
required: [flux]
required: [flux, token]
TaskUpdateRequest:
type: object
properties:
Expand Down
127 changes: 4 additions & 123 deletions http/task_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import (
"strings"
"time"

"github.com/influxdata/flux/lang"
influxdb "github.com/influxdata/influxdb"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/authorizer"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/options"
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -369,114 +366,9 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs platform.O
return req, nil
}

// createBootstrapTaskAuthorizationIfNotExists checks if a the task create request hasn't specified a token, and if the request came from a session,
// and if both of those are true, it creates an authorization and return it.
//
// Note that the created authorization will have permissions required for the task,
// but it won't have permissions to read the task, as we don't have the task ID yet.
//
// This method may return a nil error and a nil authorization, if there wasn't a need to create an authorization.
func (h *TaskHandler) createBootstrapTaskAuthorizationIfNotExists(ctx context.Context, a platform.Authorizer, t *platform.TaskCreate) (*platform.Authorization, error) {
if t.Token != "" {
return nil, nil
}

s, ok := a.(*platform.Session)
if !ok {
// If an authorization was used continue
return nil, nil
}

prog, err := lang.Compile(t.Flux, time.Now())
if err != nil {
return nil, err
}

preAuthorizer := query.NewPreAuthorizer(h.BucketService)
ps, err := preAuthorizer.RequiredPermissions(ctx, prog.Ast, &t.OrganizationID)
if err != nil {
return nil, err
}

if err := authorizer.VerifyPermissions(ctx, ps); err != nil {
return nil, err
}

opts, err := options.FromScript(t.Flux)
if err != nil {
return nil, err
}

auth := &platform.Authorization{
OrgID: t.OrganizationID,
UserID: s.UserID,
Permissions: ps,
Description: fmt.Sprintf("bootstrap authorization for task %q", opts.Name),
}

if err := h.AuthorizationService.CreateAuthorization(ctx, auth); err != nil {
return nil, err
}

t.Token = auth.Token

return auth, nil
}

func (h *TaskHandler) finalizeBootstrappedTaskAuthorization(ctx context.Context, bootstrap *platform.Authorization, task *platform.Task) error {
// If we created a bootstrapped authorization for a task,
// we need to replace it with a new authorization that allows read access on the task.
// Unfortunately for this case, updating authorizations is not allowed.
readTaskPerm, err := platform.NewPermissionAtID(task.ID, platform.ReadAction, platform.TasksResourceType, bootstrap.OrgID)
if err != nil {
// We should never fail to create a new permission like this.
return err
}
authzWithTask := &platform.Authorization{
UserID: bootstrap.UserID,
OrgID: bootstrap.OrgID,
Permissions: append([]platform.Permission{*readTaskPerm}, bootstrap.Permissions...),
Description: fmt.Sprintf("auto-generated authorization for task %q", task.Name),
}

if err := h.AuthorizationService.CreateAuthorization(ctx, authzWithTask); err != nil {
h.logger.Warn("Failed to finalize bootstrap authorization", zap.String("taskID", task.ID.String()))
// The task exists with an authorization that can't read the task.
return err
}

// Assign the new authorization...
u, err := h.TaskService.UpdateTask(ctx, task.ID, platform.TaskUpdate{Token: authzWithTask.Token})
if err != nil {
h.logger.Warn("Failed to assign finalized authorization", zap.String("authorizationID", bootstrap.ID.String()), zap.String("taskID", task.ID.String()))
// The task exists with an authorization that can't read the task,
// and we've created a new authorization for the task but not assigned it.
return err
}
*task = *u

// .. and delete the old one.
if err := h.AuthorizationService.DeleteAuthorization(ctx, bootstrap.ID); err != nil {
// Since this is the last thing we're doing, just log it if we fail to delete for some reason.
h.logger.Warn("Failed to delete bootstrap authorization", zap.String("authorizationID", bootstrap.ID.String()), zap.String("taskID", task.ID.String()))
}

return nil
}

func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.logger.Debug("task create request", zap.String("r", fmt.Sprint(r)))
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
err = &platform.Error{
Err: err,
Code: platform.EUnauthorized,
Msg: "failed to get authorizer",
}
h.HandleHTTPError(ctx, err, w)
return
}

req, err := decodePostTaskRequest(ctx, r)
if err != nil {
Expand Down Expand Up @@ -507,7 +399,6 @@ func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
return
}

bootstrapAuthz, err := h.createBootstrapTaskAuthorizationIfNotExists(ctx, auth, &req.TaskCreate)
if err != nil {
h.HandleHTTPError(ctx, err, w)
return
Expand All @@ -526,20 +417,6 @@ func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
return
}

if bootstrapAuthz != nil {
// There was a bootstrapped authorization for this task.
// Now we need to apply the final authorization for the task.
if err := h.finalizeBootstrappedTaskAuthorization(ctx, bootstrapAuthz, task); err != nil {
err = &platform.Error{
Err: err,
Msg: fmt.Sprintf("successfully created task with ID %s, but failed to finalize bootstrap token for task", task.ID.String()),
Code: platform.EInternal,
}
h.HandleHTTPError(ctx, err, w)
return
}
}
h.logger.Debug("tasks created", zap.String("task", fmt.Sprint(task)))
if err := encodeResponse(ctx, w, http.StatusCreated, newTaskResponse(*task, []*platform.Label{})); err != nil {
logEncodingError(h.logger, r, err)
return
Expand Down Expand Up @@ -1470,6 +1347,10 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if tc.Token == "" {
return nil, influxdb.ErrMissingToken
}

u, err := NewURL(t.Addr, tasksPath)
if err != nil {
return nil, err
Expand Down
128 changes: 0 additions & 128 deletions http/task_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,10 +1281,6 @@ func TestTaskHandler_Sessions(t *testing.T) {
Permissions: platform.OperPermissions(),
ExpiresAt: time.Now().Add(24 * time.Hour),
})
sessionNoPermsCtx := pcontext.SetAuthorizer(context.Background(), &platform.Session{
UserID: u.ID,
ExpiresAt: time.Now().Add(24 * time.Hour),
})

newHandler := func(t *testing.T, ts *mock.TaskService) *TaskHandler {
return NewTaskHandler(&TaskBackend{
Expand All @@ -1301,130 +1297,6 @@ func TestTaskHandler_Sessions(t *testing.T) {
})
}

t.Run("creating a task from a session", func(t *testing.T) {
taskID := platform.ID(9)
var createdTasks []platform.TaskCreate
ts := &mock.TaskService{
CreateTaskFn: func(_ context.Context, tc platform.TaskCreate) (*platform.Task, error) {
createdTasks = append(createdTasks, tc)
// Task with fake IDs so it can be serialized.
return &platform.Task{ID: taskID, OrganizationID: 99, AuthorizationID: 999, Name: "x"}, nil
},
// Needed due to task authorization bootstrapping.
UpdateTaskFn: func(ctx context.Context, id platform.ID, tu platform.TaskUpdate) (*platform.Task, error) {
authz, err := i.FindAuthorizationByToken(ctx, tu.Token)
if err != nil {
t.Fatal(err)
}

return &platform.Task{ID: taskID, OrganizationID: 99, AuthorizationID: authz.ID, Name: "x"}, nil
},
}

h := newHandler(t, ts)
url := "http://localhost:9999/api/v2/tasks"

b, err := json.Marshal(platform.TaskCreate{
Flux: `option task = {name:"x", every:1m} from(bucket:"b-src") |> range(start:-1m) |> to(bucket:"b-dst", org:"o")`,
OrganizationID: o.ID,
})
if err != nil {
t.Fatal(err)
}

r := httptest.NewRequest("POST", url, bytes.NewReader(b)).WithContext(sessionAllPermsCtx)
w := httptest.NewRecorder()

h.handlePostTask(w, r)

res := w.Result()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if res.StatusCode != http.StatusCreated {
t.Logf("response body: %s", body)
t.Fatalf("expected status created, got %v", res.StatusCode)
}

if len(createdTasks) != 1 {
t.Fatalf("didn't create task; got %#v", createdTasks)
}

// The task should have been created with a valid token.
var createdTask platform.Task
if err := json.Unmarshal([]byte(body), &createdTask); err != nil {
t.Fatal(err)
}
authz, err := i.FindAuthorizationByID(ctx, createdTask.AuthorizationID)
if err != nil {
t.Fatal(err)
}

if authz.OrgID != o.ID {
t.Fatalf("expected authorization to have org ID %v, got %v", o.ID, authz.OrgID)
}
if authz.UserID != u.ID {
t.Fatalf("expected authorization to have user ID %v, got %v", u.ID, authz.UserID)
}

const expDesc = `auto-generated authorization for task "x"`
if authz.Description != expDesc {
t.Fatalf("expected authorization to be created with description %q, got %q", expDesc, authz.Description)
}

// The authorization should be allowed to read and write the target buckets,
// and it should be allowed to read its task.
if !authz.Allowed(platform.Permission{
Action: platform.ReadAction,
Resource: platform.Resource{
Type: platform.BucketsResourceType,
OrgID: &o.ID,
ID: &bSrc.ID,
},
}) {
t.Logf("WARNING: permissions on `from` buckets not yet accessible: update test after https://github.com/influxdata/flux/issues/114 is fixed.")
}

if !authz.Allowed(platform.Permission{
Action: platform.WriteAction,
Resource: platform.Resource{
Type: platform.BucketsResourceType,
OrgID: &o.ID,
ID: &bDst.ID,
},
}) {
t.Fatalf("expected authorization to be allowed write access to destination bucket, but it wasn't allowed")
}

if !authz.Allowed(platform.Permission{
Action: platform.ReadAction,
Resource: platform.Resource{
Type: platform.TasksResourceType,
OrgID: &o.ID,
ID: &taskID,
},
}) {
t.Fatalf("expected authorization to be allowed to read its task, but it wasn't allowed")
}

// Session without permissions should not be allowed to create task.
r = httptest.NewRequest("POST", url, bytes.NewReader(b)).WithContext(sessionNoPermsCtx)
w = httptest.NewRecorder()

h.handlePostTask(w, r)

res = w.Result()
body, err = ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if res.StatusCode != http.StatusUnauthorized && res.StatusCode != http.StatusForbidden {
t.Logf("response body: %s", body)
t.Fatalf("expected status unauthorized or forbidden, got %v", res.StatusCode)
}
})

t.Run("get runs for a task", func(t *testing.T) {
// Unique authorization to associate with our fake task.
taskAuth := &platform.Authorization{OrgID: o.ID, UserID: u.ID}
Expand Down
10 changes: 4 additions & 6 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,17 +452,15 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
return nil, err
}

if tc.Token == "" {
return nil, influxdb.ErrMissingToken
}

auth, err := s.findAuthorizationByToken(ctx, tx, tc.Token)
if err != nil {
if err.Error() != "<not found> authorization not found" {
return nil, err
}
// if i cant find an authoriaztion based on the token we will use the users authID
auth, err = s.findAuthorizationByID(ctx, tx, userAuth.Identifier())
if err != nil {
// if we still fail to fine a real auth we cannot continue
return nil, err
}
}

var org *influxdb.Organization
Expand Down
12 changes: 12 additions & 0 deletions task/servicetest/servicetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,18 @@ func testTaskCRUD(t *testing.T, sys *System) {
return nil, fmt.Errorf("failed to find task by id %s", id)
}

// should not be able to create a task without a token
noToken := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
// Token: cr.Token, // should fail
}
_, err = sys.TaskService.CreateTask(authorizedCtx, noToken)

if err != influxdb.ErrMissingToken {
t.Fatalf("expected error missing token, got: %v", err)
}

// Look up a task the different ways we can.
// Map of method name to found task.
found := map[string]*influxdb.Task{
Expand Down
Loading

0 comments on commit 0fe9d10

Please sign in to comment.