Skip to content

Commit

Permalink
the monster works i guess
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Nov 12, 2024
1 parent f8c85b0 commit c263a53
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
134 changes: 134 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sort"
"strings"
Expand All @@ -16,9 +17,11 @@ import (
"github.com/golang/snappy"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/v2/codec"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -2191,6 +2194,128 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
reply.Index = evalIndex
}

if args.WaitForResult {
minIndex := reply.Index
reply.Result = &structs.JobDispatchResponseResult{}
for {
minIndex++
j.logger.Debug(">>> wait for result loop", "index", minIndex, "eval", reply.EvalID)
snap, err := j.srv.State().SnapshotMinIndex(context.TODO(), minIndex)
if err != nil {
reply.Result.Error = err.Error()
return nil
}

allocs, err := snap.AllocsByJob(nil, dispatchJob.Namespace, dispatchJob.ID, true)
if err != nil {
reply.Result.Error = err.Error()
return nil
}
j.logger.Debug(">>> allocs for job", "index", minIndex, "num", len(allocs))

for _, alloc := range allocs {
if alloc.ClientStatus == structs.AllocClientStatusComplete {
j.logger.Debug(">>> FOUND Complete!", "index", minIndex, "alloc", alloc.ID)
//TODO(schmichael) put into a func because it uses defer in a loop!
// Done!
fileReq := &cstructs.FsStreamRequest{
AllocID: alloc.ID,
Path: "/alloc/data/result",
PlainText: true, // skips base64 encoding
Limit: DispatchPayloadSizeLimit,
}
fileReq.Region = j.srv.Region()
fileReq.Namespace = alloc.Namespace
fileReq.AuthToken = args.AuthToken

node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
reply.Result.Error = err.Error()
return nil
}

if node == nil {
//TODO(schmichael) improve
reply.Result.Error = "node no longer exists"
}

// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn
nodeConnState, ok := j.srv.getNodeConn(alloc.NodeID)
if !ok {
// Determine the Server that has a connection to the node.
srv, err := j.srv.serverWithNodeConn(alloc.NodeID, j.srv.Region())
if err != nil {
reply.Result.Error = err.Error()
return nil
}

// Get a connection to the server
conn, err := j.srv.streamingRpc(srv, "FileSystem.Stream")
if err != nil {
reply.Result.Error = err.Error()
return nil
}

clientConn = conn
} else {
stream, err := NodeStreamingRpc(nodeConnState.Session, "FileSystem.Stream")
if err != nil {
reply.Result.Error = err.Error()
return nil
}
clientConn = stream
}
defer clientConn.Close()

outEncoder := codec.NewEncoder(clientConn, structs.MsgpackHandle)
if err := outEncoder.Encode(fileReq); err != nil {
reply.Result.Error = err.Error()
return nil
}

var res cstructs.StreamErrWrapper
inDecoder := codec.NewDecoder(clientConn, structs.MsgpackHandle)
if err := inDecoder.Decode(&res); err != nil {
reply.Result.Error = err.Error()
return nil
}
if res.Error != nil {
reply.Result.Error = res.Error.Message
return nil
}

// It worked!
j.logger.Debug(">>>>> Copying payload", "len", len(res.Payload))
reply.Result.Payload = res.Payload
return nil
}

if alloc.ModifyIndex > minIndex {
minIndex = alloc.ModifyIndex
}
}

// If job died before any allocs completed, that's a failure
dj, err := snap.JobByID(nil, dispatchJob.Namespace, dispatchJob.ID)
if err != nil {
reply.Result.Error = err.Error()
return nil
}
if dj == nil {
//TODO(schmichael) technically not a fatal error as the alloc could
//still exist on the client
reply.Result.Error = "Job garbage collected before results retrieved"
return nil
}
if dj.Status == structs.JobStatusDead {
reply.Result.Error = "Job died without writing results"
return nil
}
}
}

return nil
}

Expand Down Expand Up @@ -2257,6 +2382,15 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
return fmt.Errorf("Dispatch did not provide required meta keys: %v", flat)
}

if req.WaitForResult {
if n := len(job.TaskGroups); n > 1 {
return fmt.Errorf("Cannot wait for result on job with multiple (%d) groups.", n)
}
if n := job.TaskGroups[0].Count; n > 1 {
return fmt.Errorf("Cannot wait for result on group (%q) with count > 0 (%d).", job.TaskGroups[0].Name, n)
}
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,9 @@ type JobDispatchRequest struct {
Meta map[string]string
WriteRequest
IdPrefixTemplate string

//HACK(schmichael) lol
WaitForResult bool
}

// JobValidateRequest is used to validate a job
Expand Down Expand Up @@ -1643,9 +1646,17 @@ type JobDispatchResponse struct {
EvalID string
EvalCreateIndex uint64
JobCreateIndex uint64
Result *JobDispatchResponseResult

WriteMeta
}

type JobDispatchResponseResult struct {
Error string // error *after* job is dispatched but *before* a result
ExitCode int // the exit code of the main task or first nonzero task
Payload []byte // the result of the job
}

// JobListResponse is used for a list request
type JobListResponse struct {
Jobs []*JobListStub
Expand Down

0 comments on commit c263a53

Please sign in to comment.