Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: allow deleting of evaluations #13492

Merged
merged 11 commits into from
Jul 6, 2022
7 changes: 7 additions & 0 deletions .changelog/13045.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `scheduler get-config` and `scheduler set-config` commands to the operator CLI
```

```release-note:improvement
core: Added the ability to pause and un-pause the eval broker and blocked eval broker
```
7 changes: 7 additions & 0 deletions .changelog/13492.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
cli: Added `delete` command to the eval CLI
```

```release-note:improvement
agent: Added delete support to the eval HTTP API
```
4 changes: 2 additions & 2 deletions api/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (a *ACLPolicies) Delete(policyName string, q *WriteOptions) (*WriteMeta, er
if policyName == "" {
return nil, fmt.Errorf("missing policy name")
}
wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, q)
wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, nil, q)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (a *ACLTokens) Delete(accessorID string, q *WriteOptions) (*WriteMeta, erro
if accessorID == "" {
return nil, fmt.Errorf("missing accessor ID")
}
wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, q)
wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, nil, q)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,14 +982,15 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*
return wm, nil
}

// delete is used to do a DELETE request against an endpoint
// and serialize/deserialized using the standard Nomad conventions.
func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
// delete is used to do a DELETE request against an endpoint and
// serialize/deserialized using the standard Nomad conventions.
func (c *Client) delete(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
r, err := c.newRequest("DELETE", endpoint)
if err != nil {
return nil, err
}
r.setWriteOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRequestTime(t *testing.T) {
t.Errorf("bad request time: %d", wm.RequestTime)
}

wm, err = client.delete("/", &out, nil)
wm, err = client.delete("/", nil, &out, nil)
if err != nil {
t.Fatalf("delete err: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) (*WriteMeta, erro

// Deregister deregisters a single CSIVolume from Nomad. The volume will not be deleted from the external storage provider.
func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, w)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, nil, w)
return err
}

Expand All @@ -104,7 +104,7 @@ func (v *CSIVolumes) Create(vol *CSIVolume, w *WriteOptions) ([]*CSIVolume, *Wri
// passed as an argument here is for the storage provider's ID, so a volume
// that's already been deregistered can be deleted.
func (v *CSIVolumes) Delete(externalVolID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, w)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, nil, w)
return err
}

Expand All @@ -117,15 +117,15 @@ func (v *CSIVolumes) DeleteOpts(req *CSIVolumeDeleteRequest, w *WriteOptions) er
w = &WriteOptions{}
}
w.SetHeadersFromCSISecrets(req.Secrets)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, w)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, nil, w)
return err
}

// Detach causes Nomad to attempt to detach a CSI volume from a client
// node. This is used in the case that the node is temporarily lost and the
// allocations are unable to drop their claims automatically.
func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w)
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, nil, w)
return err
}

Expand All @@ -152,7 +152,7 @@ func (v *CSIVolumes) DeleteSnapshot(snap *CSISnapshot, w *WriteOptions) error {
w = &WriteOptions{}
}
w.SetHeadersFromCSISecrets(snap.Secrets)
_, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, w)
_, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, nil, w)
return err
}

Expand Down
17 changes: 17 additions & 0 deletions api/evaluations.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ func (e *Evaluations) Info(evalID string, q *QueryOptions) (*Evaluation, *QueryM
return &resp, qm, nil
}

// Delete is used to batch delete evaluations using their IDs.
func (e *Evaluations) Delete(evalIDs []string, w *WriteOptions) (*WriteMeta, error) {
req := EvalDeleteRequest{
EvalIDs: evalIDs,
}
wm, err := e.client.delete("/v1/evaluations", &req, nil, w)
if err != nil {
return nil, err
}
return wm, nil
}

// Allocations is used to retrieve a set of allocations given
// an evaluation ID.
func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) {
Expand Down Expand Up @@ -108,6 +120,11 @@ type EvaluationStub struct {
ModifyTime int64
}

type EvalDeleteRequest struct {
EvalIDs []string
WriteRequest
}

// EvalIndexSort is a wrapper to sort evaluations by CreateIndex.
// We reverse the test so that we get the highest index first.
type EvalIndexSort []*Evaluation
Expand Down
27 changes: 27 additions & 0 deletions api/evaluations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,33 @@ func TestEvaluations_Info(t *testing.T) {
}
}

func TestEvaluations_Delete(t *testing.T) {
jrasell marked this conversation as resolved.
Show resolved Hide resolved
testutil.Parallel(t)

testClient, testServer := makeClient(t, nil, nil)
defer testServer.Stop()

// Attempting to delete an evaluation when the eval broker is not paused
// should return an error.
wm, err := testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil)
require.Nil(t, wm)
require.ErrorContains(t, err, "eval broker is enabled")

// Pause the eval broker, and try to delete an evaluation that does not
// exist.
schedulerConfig, _, err := testClient.Operator().SchedulerGetConfiguration(nil)
require.NoError(t, err)
require.NotNil(t, schedulerConfig)

schedulerConfig.SchedulerConfig.PauseEvalBroker = true
schedulerConfigUpdated, _, err := testClient.Operator().SchedulerCASConfiguration(schedulerConfig.SchedulerConfig, nil)
require.NoError(t, err)
require.True(t, schedulerConfigUpdated.Updated)

wm, err = testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil)
require.ErrorContains(t, err, "eval not found")
}

func TestEvaluations_Allocations(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
Expand Down
4 changes: 2 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
// eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), &resp, q)
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), nil, &resp, q)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt
opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay)
}

wm, err := j.client.delete(endpoint, &resp, q)
wm, err := j.client.delete(endpoint, nil, &resp, q)
if err != nil {
return "", nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion api/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (n *Namespaces) Register(namespace *Namespace, q *WriteOptions) (*WriteMeta

// Delete is used to delete a namespace
func (n *Namespaces) Delete(namespace string, q *WriteOptions) (*WriteMeta, error) {
wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, q)
wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, nil, q)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type SchedulerConfiguration struct {
// management ACL token
RejectJobRegistration bool

// PauseEvalBroker stops the leader evaluation broker process from running
// until the configuration is updated and written to the Nomad servers.
PauseEvalBroker bool

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
Expand Down
44 changes: 44 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -53,3 +54,46 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.NotEmpty(t, schedulerConfig)
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
testutil.Parallel(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

newSchedulerConfig := SchedulerConfiguration{
SchedulerAlgorithm: SchedulerAlgorithmSpread,
PreemptionConfig: PreemptionConfig{
SystemSchedulerEnabled: true,
SysBatchSchedulerEnabled: true,
BatchSchedulerEnabled: true,
ServiceSchedulerEnabled: true,
},
MemoryOversubscriptionEnabled: true,
RejectJobRegistration: true,
PauseEvalBroker: true,
}

schedulerConfigUpdateResp, _, err := c.Operator().SchedulerSetConfiguration(&newSchedulerConfig, nil)
require.Nil(t, err)
require.True(t, schedulerConfigUpdateResp.Updated)

// We can't exactly predict the query meta responses, so we test fields
// individually.
schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil)
require.Nil(t, err)
require.Equal(t, schedulerConfig.SchedulerConfig.SchedulerAlgorithm, SchedulerAlgorithmSpread)
require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker)
require.True(t, schedulerConfig.SchedulerConfig.RejectJobRegistration)
require.True(t, schedulerConfig.SchedulerConfig.MemoryOversubscriptionEnabled)
require.Equal(t, newSchedulerConfig.PreemptionConfig, schedulerConfig.SchedulerConfig.PreemptionConfig)
}
2 changes: 1 addition & 1 deletion api/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (q *Quotas) Register(spec *QuotaSpec, qo *WriteOptions) (*WriteMeta, error)

// Delete is used to delete a quota spec
func (q *Quotas) Delete(quota string, qo *WriteOptions) (*WriteMeta, error) {
wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, qo)
wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, nil, qo)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion api/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*W
// Delete is used to do a DELETE request against an endpoint
// and serialize/deserialized using the standard Nomad conventions.
func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) {
return raw.c.delete(endpoint, out, q)
return raw.c.delete(endpoint, nil, out, q)
}
2 changes: 1 addition & 1 deletion api/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (a *SentinelPolicies) Delete(policyName string, q *WriteOptions) (*WriteMet
if policyName == "" {
return nil, fmt.Errorf("missing policy name")
}
wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, q)
wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, nil, q)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion api/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *Services) Get(serviceName string, q *QueryOptions) ([]*ServiceRegistrat
// by its service name and service ID.
func (s *Services) Delete(serviceName, serviceID string, q *WriteOptions) (*WriteMeta, error) {
path := fmt.Sprintf("/v1/service/%s/%s", url.PathEscape(serviceName), url.PathEscape(serviceID))
wm, err := s.client.delete(path, nil, q)
wm, err := s.client.delete(path, nil, nil, q)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func TestClient_WatchAllocs(t *testing.T) {
})

// Delete one allocation
if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
if err := state.DeleteEval(103, nil, []string{alloc1.ID}, false); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
48 changes: 46 additions & 2 deletions command/agent/eval_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
package agent

import (
"fmt"
"net/http"
"strings"

"github.com/hashicorp/nomad/nomad/structs"
)

// EvalsRequest is the entry point for /v1/evaluations and is responsible for
// handling both the listing of evaluations, and the bulk deletion of
// evaluations. The latter is a dangerous operation and should use the
// eval delete command to perform this.
func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, CodedError(405, ErrInvalidMethod)
switch req.Method {
case http.MethodGet:
return s.evalsListRequest(resp, req)
case http.MethodDelete:
return s.evalsDeleteRequest(resp, req)
default:
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
}

func (s *HTTPServer) evalsListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

args := structs.EvalListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
Expand All @@ -33,6 +46,37 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (
return out.Evaluations, nil
}

func (s *HTTPServer) evalsDeleteRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

var args structs.EvalDeleteRequest

if err := decodeBody(req, &args); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

numIDs := len(args.EvalIDs)

// Ensure the number of evaluation IDs included in the request is within
// bounds.
if numIDs < 1 {
return nil, CodedError(http.StatusBadRequest, "request does not include any evaluation IDs")
} else if numIDs > structs.MaxUUIDsPerWriteRequest {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf(
"request includes %v evaluations IDs, must be %v or fewer",
numIDs, structs.MaxUUIDsPerWriteRequest))
}

// Pass the write request to populate all meta fields.
s.parseWriteRequest(req, &args.WriteRequest)

var reply structs.EvalDeleteResponse
if err := s.agent.RPC(structs.EvalDeleteRPCMethod, &args, &reply); err != nil {
return nil, err
}
setIndex(resp, reply.Index)
return nil, nil
}

func (s *HTTPServer) EvalSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
path := strings.TrimPrefix(req.URL.Path, "/v1/evaluation/")
switch {
Expand Down
Loading