From acb50cbb084eef8b43a47e9b58ba17f925e0c30d Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Sat, 2 May 2020 12:06:20 +0200 Subject: [PATCH 1/3] refactor: Use gRPC streaming calls to run jobs Serf queries works over UDP sending messages to nodes and expecting a response, there is no guarantee or verification of the message delivery. This was causing some job executions to be lost, around 1%-2% on edge cases. Solution Agents will listen to gRPC calls from the servers, using server-side streaming, the other way around as currently, agents will send Execution progress as now. Servers will actively order nodes to run the jobs, so we can verify the job is actually being executed or report an error in case it's not. This maintains the same guarantees in job status reporting and streaming, also adding the possibility to cancel an execution in the future. --- dkron/agent.go | 197 ++++++++++---------- dkron/agent_test.go | 64 ++++++- dkron/grpc.go | 63 +------ dkron/grpc_agent.go | 125 +++++++++++++ dkron/grpc_client.go | 74 ++++++++ dkron/invoke.go | 156 ---------------- dkron/invoke_test.go | 1 - dkron/job.go | 15 +- dkron/job_test.go | 12 +- dkron/notifier.go | 2 +- dkron/queries.go | 133 -------------- dkron/queries_test.go | 57 ------ dkron/run.go | 66 +++++++ go.mod | 2 + go.sum | 2 + plugin/types/dkron.pb.go | 378 +++++++++++++++++++++++---------------- proto/dkron.proto | 10 +- scripts/ansible/site.yml | 48 +++++ 18 files changed, 734 insertions(+), 671 deletions(-) create mode 100644 dkron/grpc_agent.go delete mode 100644 dkron/invoke.go delete mode 100644 dkron/invoke_test.go delete mode 100644 dkron/queries.go delete mode 100644 dkron/queries_test.go create mode 100644 dkron/run.go diff --git a/dkron/agent.go b/dkron/agent.go index 6cb4e81d8..f9072f622 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -2,7 +2,6 @@ package dkron import ( "crypto/tls" - "encoding/json" "errors" "expvar" "fmt" @@ -23,8 +22,11 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" + "github.com/juliangruber/go-intersect" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( @@ -41,6 +43,9 @@ var ( // ErrLeaderNotFound is returned when obtained leader is not found in member list ErrLeaderNotFound = errors.New("no member leader found in member list") + // ErrNoSuitableServer returns an error in case no suitable server to send the request is found. + ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting") + runningExecutions sync.Map ) @@ -101,6 +106,8 @@ type Agent struct { peers map[string][]*ServerParts localPeers map[raft.ServerAddress]*ServerParts peerLock sync.RWMutex + + activeExecutions sync.Map } // ProcessorFactory is a function type that creates a new instance @@ -159,8 +166,31 @@ func (a *Agent) Start() error { // Expose the node name expNode.Set(a.config.NodeName) + //Use the value of "RPCPort" if AdvertiseRPCPort has not been set + if a.config.AdvertiseRPCPort <= 0 { + a.config.AdvertiseRPCPort = a.config.RPCPort + } + if a.config.Server { a.StartServer() + } else { + // Create a listener at the desired port. + addr := a.getRPCAddr() + l, err := net.Listen("tcp", addr) + if err != nil { + log.Fatal(err) + } + + opts := []grpc.ServerOption{} + if a.TLSConfig != nil { + tc := credentials.NewTLS(a.TLSConfig) + opts = append(opts, grpc.Creds(tc)) + } + + grpcServer := grpc.NewServer(opts...) + as := NewAgentServer(a) + proto.RegisterAgentServer(grpcServer, as) + go grpcServer.Serve(l) } if a.GRPCClient == nil { @@ -168,10 +198,8 @@ func (a *Agent) Start() error { } tags := a.serf.LocalMember().Tags - if a.config.Server { - tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers - tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort) - } + tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers + tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort) a.serf.SetTags(tags) go a.eventLoop() @@ -447,11 +475,6 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { - //Use the value of "RPCPort" if AdvertiseRPCPort has not been set - if a.config.AdvertiseRPCPort <= 0 { - a.config.AdvertiseRPCPort = a.config.RPCPort - } - if a.Store == nil { s, err := NewStore() if err != nil { @@ -634,62 +657,6 @@ func (a *Agent) eventLoop() { } } - if e.EventType() == serf.EventQuery { - query := e.(*serf.Query) - - if query.Name == QueryRunJob { - log.WithFields(logrus.Fields{ - "query": query.Name, - "payload": string(query.Payload), - "at": query.LTime, - }).Debug("agent: Running job") - - var rqp RunQueryParam - if err := json.Unmarshal(query.Payload, &rqp); err != nil { - log.WithField("query", QueryRunJob).Fatal("agent: Error unmarshaling query payload") - } - - log.WithFields(logrus.Fields{ - "job": rqp.Execution.JobName, - }).Info("agent: Starting job") - - // There are two error types to handle here: - // Key not found when the job is removed from store - // Dial tcp error - // In case of deleted job or other error, we should report and break the flow. - // On dial error we should retry with a limit. - i := 0 - RetryGetJob: - job, err := a.GRPCClient.GetJob(rqp.RPCAddr, rqp.Execution.JobName) - if err != nil { - if err == ErrRPCDialing { - if i < 10 { - i++ - goto RetryGetJob - } - log.WithError(err).Fatal("agent: A working RPC connection to a Dkron server must exists.") - } - log.WithError(err).Error("agent: Error on rpc.GetJob call") - continue - } - log.WithField("job", job.Name).Debug("agent: GetJob by RPC") - - ex := rqp.Execution - ex.StartedAt = time.Now() - ex.NodeName = a.config.NodeName - - go func() { - if err := a.invokeJob(job, ex); err != nil { - log.WithError(err).Error("agent: Error invoking job") - } - }() - - // Respond with the execution JSON though it is not used in the destination - exJSON, _ := json.Marshal(ex) - query.Respond(exJSON) - } - } - case <-serfShutdownCh: log.Warn("agent: Serf shutdown detected, quitting") return @@ -710,9 +677,9 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, error) { - var nodes []string - var candidates []string +func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) { + // candidates will contain a set of candidates by tags + // the final set of nodes will be the intesection of all groups tags := make(map[string]string) // Actually copy the map @@ -724,42 +691,73 @@ func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, err // on the same region. tags["region"] = a.config.Region + candidates := [][]string{} for jtk, jtv := range tags { - var tc []string - if tc = strings.Split(jtv, ":"); len(tc) == 2 { - tv := tc[0] + cans := []string{} + tc := strings.Split(jtv, ":") - // Set original tag to clean tag - tags[jtk] = tv + tv := tc[0] - count, err := strconv.Atoi(tc[1]) - if err != nil { - return nil, nil, err - } + // Set original tag to clean tag + tags[jtk] = tv - for _, member := range a.serf.Members() { - if member.Status == serf.StatusAlive { - for mtk, mtv := range member.Tags { - if mtk == jtk && mtv == tv { - candidates = append(candidates, member.Name) - } + for _, member := range a.serf.Members() { + if member.Status == serf.StatusAlive { + for mtk, mtv := range member.Tags { + if mtk == jtk && mtv == tv { + cans = append(cans, member.Name) } } } + } + + // In case there is cardinality in the tag, randomize the order and select the amount of nodes + // or else just add all nodes to the result. + if len(tc) == 2 { + f := []string{} rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(candidates), func(i, j int) { - candidates[i], candidates[j] = candidates[j], candidates[i] + rand.Shuffle(len(cans), func(i, j int) { + cans[i], cans[j] = cans[j], cans[i] }) + count, err := strconv.Atoi(tc[1]) + if err != nil { + return nil, nil, err + } for i := 1; i <= count; i++ { - if len(candidates) == 0 { + if len(cans) == 0 { break } - nodes = append(nodes, candidates[0]) - candidates = candidates[1:] + f = append(f, cans[0]) + cans = cans[1:] } - candidates = nil + cans = f + } + if len(cans) > 0 { + candidates = append(candidates, cans) + } + } + + // The final result will be the intersection of all candidates. + nodes := make(map[string]string) + r := candidates[0] + for i := 1; i <= len(candidates)-1; i++ { + isec := intersect.Simple(r, candidates[i]).([]interface{}) + // Empty the slice + r = []string{} + + // Refill with the intersection + for _, v := range isec { + r = append(r, v.(string)) + } + } + for _, n := range r { + for _, m := range a.serf.Members() { + if n == m.Name { + // If the server is missing the rpc_addr tag, default to the serf advertise addr + nodes[n] = m.Tags["rpc_addr"] + } } } @@ -844,3 +842,22 @@ func (a *Agent) recursiveSetJob(jobs []*Job) []string { } return result } + +// Check if the server is alive and select it +func (a *Agent) checkAndSelectServer() (string, error) { + var peers []string + for _, p := range a.LocalServers() { + peers = append(peers, p.RPCAddr.String()) + } + + for _, peer := range peers { + log.Debugf("Checking peer: %v", peer) + conn, err := net.DialTimeout("tcp", peer, 1*time.Second) + if err == nil { + conn.Close() + log.Debugf("Found good peer: %v", peer) + return peer, nil + } + } + return "", ErrNoSuitableServer +} diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 0d74a4bf7..baf2e06c1 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -146,6 +146,27 @@ func Test_processFilteredNodes(t *testing.T) { a2 := NewAgent(c) a2.Start() + // Start another agent + ip3, returnFn3 := testutil.TakeIP() + defer returnFn3() + a3Addr := ip3.String() + + c = DefaultConfig() + c.BindAddr = a3Addr + c.StartJoin = []string{a1Addr + ":8946"} + c.NodeName = "test3" + c.Server = false + c.LogLevel = logLevel + c.Tags = map[string]string{ + "tag": "test_client", + "extra": "tag", + } + c.DevMode = true + c.DataDir = dir + + a3 := NewAgent(c) + a3.Start() + time.Sleep(2 * time.Second) job := &Job{ @@ -157,16 +178,53 @@ func Test_processFilteredNodes(t *testing.T) { } nodes, tags, err := a1.processFilteredNodes(job) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Contains(t, nodes, "test1") assert.Contains(t, nodes, "test2") + assert.Len(t, nodes, 2) assert.Equal(t, tags["tag"], "test") + job2 := &Job{ + Name: "test_job_2", + Tags: map[string]string{ + "tag": "test:1", + }, + } + + nodes, _, err = a1.processFilteredNodes(job2) + require.NoError(t, err) + + assert.Len(t, nodes, 1) + + job3 := &Job{ + Name: "test_job_3", + } + + nodes, _, err = a1.processFilteredNodes(job3) + require.NoError(t, err) + + assert.Len(t, nodes, 3) + assert.Contains(t, nodes, "test1") + assert.Contains(t, nodes, "test2") + assert.Contains(t, nodes, "test3") + + job4 := &Job{ + Name: "test_job_4", + Tags: map[string]string{ + "tag": "test_client:1", + }, + } + + nodes, _, err = a1.processFilteredNodes(job4) + require.NoError(t, err) + + assert.Len(t, nodes, 1) + assert.Contains(t, nodes, "test3") + a1.Stop() a2.Stop() + a3.Stop() } func TestEncrypt(t *testing.T) { diff --git a/dkron/grpc.go b/dkron/grpc.go index 8f1da930a..e52be4719 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "io" "net" "sync" "time" @@ -57,6 +56,9 @@ func NewGRPCServer(agent *Agent) DkronGRPCServer { func (grpcs *GRPCServer) Serve(lis net.Listener) error { grpcServer := grpc.NewServer() proto.RegisterDkronServer(grpcServer, grpcs) + + as := NewAgentServer(grpcs.agent) + proto.RegisterAgentServer(grpcServer, as) go grpcServer.Serve(lis) return nil @@ -212,7 +214,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E "execution": execution, }).Debug("grpc: Retrying execution") - if _, err := grpcs.agent.RunQuery(job.Name, execution); err != nil { + if _, err := grpcs.agent.Run(job.Name, execution); err != nil { return nil, err } return &proto.ExecutionDoneResponse{ @@ -260,7 +262,7 @@ func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Emp // RunJob runs a job in the cluster func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error) { ex := NewExecution(req.JobName) - job, err := grpcs.agent.RunQuery(req.JobName, ex) + job, err := grpcs.agent.Run(req.JobName, ex) if err != nil { return nil, err } @@ -362,65 +364,12 @@ REMOVE: return new(empty.Empty), nil } -// AgentRun is called when an agent starts running a job and lasts all execution, -// the agent will stream execution progress to the server. -func (grpcs *GRPCServer) AgentRun(stream proto.Dkron_AgentRunServer) error { - defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) - - var execution *Execution - var first bool - for { - ars, err := stream.Recv() - - // Stream ends - if err == io.EOF { - addr := grpcs.agent.raft.Leader() - if err := grpcs.agent.GRPCClient.ExecutionDone(string(addr), execution); err != nil { - return err - } - return stream.SendAndClose(&proto.AgentRunResponse{ - From: grpcs.agent.Config().NodeName, - }) - } - - // Error receiving from stream - if err != nil { - // At this point the execution status will be unknown, set the FinshedAt time and an explanatory message - execution.FinishedAt = time.Now() - execution.Output = ErrBrokenStream.Error() - - log.WithError(err).Error(ErrBrokenStream) - - addr := grpcs.agent.raft.Leader() - if err := grpcs.agent.GRPCClient.ExecutionDone(string(addr), execution); err != nil { - return err - } - return err - } - - // Registers an active stream - grpcs.activeExecutions.Store(ars.Execution.Key(), ars.Execution) - log.WithField("key", ars.Execution.Key()).Debug("grpc: received execution stream") - - execution = NewExecutionFromProto(ars.Execution) - defer grpcs.activeExecutions.Delete(execution.Key()) - - // Store the received exeuction in the raft log and store - if !first { - if err := grpcs.agent.GRPCClient.SetExecution(ars.Execution); err != nil { - return err - } - first = true - } - } -} - // GetActiveExecutions returns the active executions on the server node func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error) { defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) var executions []*proto.Execution - grpcs.activeExecutions.Range(func(k, v interface{}) bool { + grpcs.agent.activeExecutions.Range(func(k, v interface{}) bool { e := v.(*proto.Execution) executions = append(executions, e) return true diff --git a/dkron/grpc_agent.go b/dkron/grpc_agent.go new file mode 100644 index 000000000..636967880 --- /dev/null +++ b/dkron/grpc_agent.go @@ -0,0 +1,125 @@ +package dkron + +import ( + "errors" + "time" + + "github.com/armon/circbuf" + metrics "github.com/armon/go-metrics" + "github.com/distribworks/dkron/v2/plugin/types" + "github.com/golang/protobuf/ptypes" +) + +const ( + // maxBufSize limits how much data we collect from a handler. + maxBufSize = 256000 +) + +type statusAgentHelper struct { + execution *types.Execution + stream types.Agent_AgentRunServer +} + +func (s *statusAgentHelper) Update(b []byte, c bool) (int64, error) { + s.execution.Output = b + // Send partial execution + if err := s.stream.Send(&types.AgentRunStream{ + Execution: s.execution, + }); err != nil { + return 0, err + } + return 0, nil +} + +// GRPCAgentServer is the local implementation of the gRPC server interface. +type AgentServer struct { + agent *Agent +} + +// NewServer creates and returns an instance of a DkronGRPCServer implementation +func NewAgentServer(agent *Agent) types.AgentServer { + return &AgentServer{ + agent: agent, + } +} + +// AgentRun is called when an agent starts running a job and lasts all execution, +// the agent will stream execution progress to the server. +func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error { + defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) + + job := req.Job + execution := req.Execution + + output, _ := circbuf.NewBuffer(maxBufSize) + + var success bool + + jex := job.Executor + exc := job.ExecutorConfig + if jex == "" { + return errors.New("invoke: No executor defined, nothing to do") + } + + // Send the first update with the initial execution state to be stored in the server + execution.StartedAt = ptypes.TimestampNow() + execution.NodeName = as.agent.config.NodeName + + if err := stream.Send(&types.AgentRunStream{ + Execution: execution, + }); err != nil { + return err + } + + // Check if executor exists + if executor, ok := as.agent.ExecutorPlugins[jex]; ok { + log.WithField("plugin", jex).Debug("invoke: calling executor plugin") + runningExecutions.Store(execution.GetGroup(), execution) + out, err := executor.Execute(&types.ExecuteRequest{ + JobName: job.Name, + Config: exc, + }, &statusAgentHelper{ + stream: stream, + execution: execution, + }) + + if err == nil && out.Error != "" { + err = errors.New(out.Error) + } + if err != nil { + log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output") + success = false + output.Write([]byte(err.Error() + "\n")) + } else { + success = true + } + + if out != nil { + output.Write(out.Output) + } + } else { + log.WithField("executor", jex).Error("invoke: Specified executor is not present") + output.Write([]byte("invoke: Specified executor is not present")) + } + + execution.FinishedAt = ptypes.TimestampNow() + execution.Success = success + execution.Output = output.Bytes() + + runningExecutions.Delete(execution.GetGroup()) + + // Send the final execution + if err := stream.Send(&types.AgentRunStream{ + Execution: execution, + }); err != nil { + // In case of error means that maybe the server is gone so fallback to ExecutionDone + log.WithError(err).WithField("job", job.Name).Error("invoke: error sending the final execution, falling back to ExecutionDone") + rpcServer, err := as.agent.checkAndSelectServer() + if err != nil { + return err + } + return as.agent.GRPCClient.ExecutionDone(rpcServer, NewExecutionFromProto(execution)) + } + + return nil +} diff --git a/dkron/grpc_client.go b/dkron/grpc_client.go index 1a6da4423..9bf03995c 100644 --- a/dkron/grpc_client.go +++ b/dkron/grpc_client.go @@ -2,10 +2,12 @@ package dkron import ( "fmt" + "io" "time" metrics "github.com/armon/go-metrics" proto "github.com/distribworks/dkron/v2/plugin/types" + "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -26,6 +28,7 @@ type DkronGRPCClient interface { RaftRemovePeerByID(string, string) error GetActiveExecutions(string) ([]*proto.Execution, error) SetExecution(execution *proto.Execution) error + AgentRun(addr string, job *proto.Job, execution *proto.Execution) error } // GRPCClient is the local implementation of the DkronGRPCClient interface. @@ -378,3 +381,74 @@ func (grpcc *GRPCClient) SetExecution(execution *proto.Execution) error { } return nil } + +// AgentRun runs a job in the given agent +func (grpcc *GRPCClient) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error { + defer metrics.MeasureSince([]string{"grpc_client", "agent_run"}, time.Now()) + var conn *grpc.ClientConn + + // Initiate a connection with the server + conn, err := grpcc.Connect(string(addr)) + if err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "method": "AgentRun", + "server_addr": addr, + }).Error("grpc: error dialing.") + return err + } + defer conn.Close() + + // Streaming call + a := proto.NewAgentClient(conn) + stream, err := a.AgentRun(context.Background(), &proto.AgentRunRequest{ + Job: job, + Execution: execution, + }) + if err != nil { + return err + } + + var first bool + for { + ars, err := stream.Recv() + + // Stream ends + if err == io.EOF { + addr := grpcc.agent.raft.Leader() + if err := grpcc.ExecutionDone(string(addr), NewExecutionFromProto(execution)); err != nil { + return err + } + return nil + } + + // Error receiving from stream + if err != nil { + // At this point the execution status will be unknown, set the FinshedAt time and an explanatory message + execution.FinishedAt = ptypes.TimestampNow() + execution.Output = []byte(ErrBrokenStream.Error()) + + log.WithError(err).Error(ErrBrokenStream) + + addr := grpcc.agent.raft.Leader() + if err := grpcc.ExecutionDone(string(addr), NewExecutionFromProto(execution)); err != nil { + return err + } + return err + } + + // Registers an active stream + grpcc.agent.activeExecutions.Store(ars.Execution.Key(), ars.Execution) + log.WithField("key", ars.Execution.Key()).Debug("grpc: received execution stream") + + execution = ars.Execution + defer grpcc.agent.activeExecutions.Delete(execution.Key()) + + // Store the received execution in the raft log and store + if !first { + if err := grpcc.SetExecution(ars.Execution); err != nil { + return err + } + first = true + } + } +} diff --git a/dkron/invoke.go b/dkron/invoke.go deleted file mode 100644 index 9da4ccbf2..000000000 --- a/dkron/invoke.go +++ /dev/null @@ -1,156 +0,0 @@ -package dkron - -import ( - "context" - "errors" - "net" - "time" - - "github.com/armon/circbuf" - "github.com/distribworks/dkron/v2/plugin/types" - "github.com/sirupsen/logrus" -) - -const ( - // maxBufSize limits how much data we collect from a handler. - maxBufSize = 256000 -) - -// ErrNoSuitableServer returns an error in case no suitable server to send the request is found. -var ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting") - -type statusHelper struct { - execution *Execution - stream types.Dkron_AgentRunClient -} - -func (s *statusHelper) Update(b []byte, c bool) (int64, error) { - s.execution.Output = string(b) - // Send partial execution - if err := s.stream.Send(&types.AgentRunStream{ - Execution: s.execution.ToProto(), - }); err != nil { - return 0, err - } - return 0, nil -} - -// invokeJob will execute the given job. Depending on the event. -func (a *Agent) invokeJob(job *Job, execution *Execution) error { - output, _ := circbuf.NewBuffer(maxBufSize) - - var success bool - - jex := job.Executor - exc := job.ExecutorConfig - if jex == "" { - return errors.New("invoke: No executor defined, nothing to do") - } - - // Connect to a server to stream the execution - rpcServer, err := a.checkAndSelectServer() - if err != nil { - return err - } - log.WithField("server", rpcServer).Debug("invoke: Selected a server to send result") - - conn, err := a.GRPCClient.Connect(rpcServer) - if err != nil { - log.WithError(err).WithFields(logrus.Fields{ - "method": "Invoke", - "server_addr": rpcServer, - }).Error("grpc: error dialing.") - return err - } - defer conn.Close() - client := types.NewDkronClient(conn) - - stream, err := client.AgentRun(context.Background()) - if err != nil { - return err - } - - // Send the first update with the initial execution state to be stored in the server - if err := stream.Send(&types.AgentRunStream{ - Execution: execution.ToProto(), - }); err != nil { - return err - } - - // Check if executor exists - if executor, ok := a.ExecutorPlugins[jex]; ok { - log.WithField("plugin", jex).Debug("invoke: calling executor plugin") - runningExecutions.Store(execution.GetGroup(), execution) - out, err := executor.Execute(&types.ExecuteRequest{ - JobName: job.Name, - Config: exc, - }, &statusHelper{ - stream: stream, - execution: execution, - }) - - if err == nil && out.Error != "" { - err = errors.New(out.Error) - } - if err != nil { - log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output") - success = false - output.Write([]byte(err.Error() + "\n")) - } else { - success = true - } - - if out != nil { - output.Write(out.Output) - } - } else { - log.WithField("executor", jex).Error("invoke: Specified executor is not present") - } - - execution.FinishedAt = time.Now() - execution.Success = success - execution.Output = output.String() - - runningExecutions.Delete(execution.GetGroup()) - - // Send the final execution - if err := stream.Send(&types.AgentRunStream{ - Execution: execution.ToProto(), - }); err != nil { - // In case of error means that maybe the server is gone so fallback to ExecutionDone - log.WithError(err).WithField("job", job.Name).Error("invoke: error sending the final execution, falling back to ExecutionDone") - return a.GRPCClient.ExecutionDone(rpcServer, execution) - } - - // Close the stream - reply, err := stream.CloseAndRecv() - if err != nil { - // In case of error means that maybe the server is gone so fallback to ExecutionDone - log.WithError(err).WithField("job", job.Name).Error("invoke: error closing the stream, falling back to ExecutionDone") - return a.GRPCClient.ExecutionDone(rpcServer, execution) - } - - // TODO add better logging - log.WithField("from", reply.From).Debug("agent: AgentRun reply") - - return nil -} - -// Check if the server is alive and select it -func (a *Agent) checkAndSelectServer() (string, error) { - var peers []string - for _, p := range a.LocalServers() { - peers = append(peers, p.RPCAddr.String()) - } - - for _, peer := range peers { - log.Debugf("Checking peer: %v", peer) - conn, err := net.DialTimeout("tcp", peer, 1*time.Second) - if err == nil { - conn.Close() - log.Debugf("Found good peer: %v", peer) - return peer, nil - } - } - return "", ErrNoSuitableServer -} diff --git a/dkron/invoke_test.go b/dkron/invoke_test.go deleted file mode 100644 index 115a0b6dd..000000000 --- a/dkron/invoke_test.go +++ /dev/null @@ -1 +0,0 @@ -package dkron diff --git a/dkron/job.go b/dkron/job.go index 9ce79e54e..d70159ff3 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -96,9 +96,6 @@ type Job struct { // Number of times to retry a job that failed an execution. Retries uint `json:"retries"` - // running indicates that the Run method is still broadcasting - running bool - // Jobs that are dependent upon this one will be run after this job runs. DependentJobs []string `json:"dependent_jobs"` @@ -229,9 +226,6 @@ func (j *Job) Run() { // Check if it's runnable if j.isRunnable() { - j.running = true - defer func() { j.running = false }() - log.WithFields(logrus.Fields{ "job": j.Name, "schedule": j.Schedule, @@ -241,7 +235,8 @@ func (j *Job) Run() { // Simple execution wrapper ex := NewExecution(j.Name) - if _, err := j.Agent.RunQuery(j.Name, ex); err != nil { + + if _, err := j.Agent.Run(j.Name, ex); err != nil { log.WithError(err).Error("job: Error sending Run query to serf cluster") } } @@ -317,12 +312,6 @@ func (j *Job) isRunnable() bool { } } - if j.running { - log.WithField("job", j.Name). - Warning("job: Skipping execution because last execution still broadcasting, consider increasing schedule interval") - return false - } - return true } diff --git a/dkron/job_test.go b/dkron/job_test.go index fb48bec54..14900be4e 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -142,15 +142,6 @@ func Test_isRunnable(t *testing.T) { }, want: false, }, - { - name: "running true", - job: &Job{ - Name: "test_job", - Agent: a, - running: true, - }, - want: false, - }, { name: "should run", job: &Job{ @@ -190,6 +181,9 @@ func (gRPCClientMock) GetActiveExecutions(s string) ([]*proto.Execution, error) }, nil } func (gRPCClientMock) SetExecution(execution *proto.Execution) error { return nil } +func (gRPCClientMock) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error { + return nil +} func Test_generateJobTree(t *testing.T) { jsonString := `[ diff --git a/dkron/notifier.go b/dkron/notifier.go index a6ddc8020..b0e743d38 100644 --- a/dkron/notifier.go +++ b/dkron/notifier.go @@ -88,7 +88,7 @@ func (n *Notifier) buildTemplate(templ string) *bytes.Buffer { n.Execution.FinishedAt, fmt.Sprintf("%t", n.Execution.Success), n.Execution.NodeName, - fmt.Sprintf("%s", n.Execution.Output), + n.Execution.Output, } out := &bytes.Buffer{} diff --git a/dkron/queries.go b/dkron/queries.go deleted file mode 100644 index 0f14d5d0e..000000000 --- a/dkron/queries.go +++ /dev/null @@ -1,133 +0,0 @@ -package dkron - -import ( - "bytes" - "encoding/json" - "fmt" - "time" - - "github.com/hashicorp/serf/serf" - "github.com/sirupsen/logrus" -) - -const ( - // QueryRunJob define a run job query string - QueryRunJob = "run:job" - // QueryExecutionDone define the execution done query string - QueryExecutionDone = "execution:done" -) - -// RunQueryParam defines the struct used to send a Run query -// using serf. -type RunQueryParam struct { - Execution *Execution `json:"execution"` - RPCAddr string `json:"rpc_addr"` -} - -// RunQuery sends a serf run query to the cluster, this is used to ask a node or nodes -// to run a Job. Returns a job with it's new status and next schedule. -func (a *Agent) RunQuery(jobName string, ex *Execution) (*Job, error) { - start := time.Now() - var params *serf.QueryParam - - job, err := a.Store.GetJob(jobName, nil) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery error retrieving job: %s from store: %w", jobName, err) - } - - // In case the job is not a child job, compute the next execution time - if job.ParentJob == "" { - if e, ok := a.sched.GetEntry(jobName); ok { - job.Next = e.Next - if err := a.applySetJob(job.ToProto()); err != nil { - return nil, fmt.Errorf("agent: RunQuery error storing job %s before running: %w", jobName, err) - } - } else { - return nil, fmt.Errorf("agent: RunQuery error retrieving job: %s from scheduler", jobName) - } - } - - // In the first execution attempt we build and filter the target nodes - // but we use the existing node target in case of retry. - if ex.Attempt <= 1 { - filterNodes, filterTags, err := a.processFilteredNodes(job) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery error processing filtered nodes: %w", err) - } - log.Debug("agent: Filtered nodes to run: ", filterNodes) - log.Debug("agent: Filtered tags to run: ", filterTags) - - //serf match regexp but we want only match full tag - serfFilterTags := make(map[string]string) - for key, val := range filterTags { - b := new(bytes.Buffer) - b.WriteString("^") - b.WriteString(val) - b.WriteString("$") - serfFilterTags[key] = b.String() - } - - params = &serf.QueryParam{ - FilterNodes: filterNodes, - FilterTags: serfFilterTags, - RequestAck: true, - } - } else { - params = &serf.QueryParam{ - FilterNodes: []string{ex.NodeName}, - RequestAck: true, - } - } - - rqp := &RunQueryParam{ - Execution: ex, - RPCAddr: a.getRPCAddr(), - } - rqpJSON, _ := json.Marshal(rqp) - - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job_name": job.Name, - }).Info("agent: Sending query") - - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job_name": job.Name, - "json": string(rqpJSON), - }).Debug("agent: Sending query") - - qr, err := a.serf.Query(QueryRunJob, rqpJSON, params) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery sending query error: %w", err) - } - defer qr.Close() - - ackCh := qr.AckCh() - respCh := qr.ResponseCh() - - for !qr.Finished() { - select { - case ack, ok := <-ackCh: - if ok { - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "from": ack, - }).Debug("agent: Received ack") - } - case resp, ok := <-respCh: - if ok { - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "from": resp.From, - "response": string(resp.Payload), - }).Debug("agent: Received response") - } - } - } - log.WithFields(logrus.Fields{ - "time": time.Since(start), - "query": QueryRunJob, - }).Debug("agent: Done receiving acks and responses") - - return job, nil -} diff --git a/dkron/queries_test.go b/dkron/queries_test.go deleted file mode 100644 index 96bcd6fbd..000000000 --- a/dkron/queries_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package dkron - -import ( - "errors" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/hashicorp/serf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tidwall/buntdb" -) - -func TestRunQuery(t *testing.T) { - dir, err := ioutil.TempDir("", "dkron-test") - require.NoError(t, err) - defer os.RemoveAll(dir) - - ip1, returnFn1 := testutil.TakeIP() - defer returnFn1() - advAddr := ip1.String() - - c := DefaultConfig() - c.BindAddr = advAddr + ":5000" - c.NodeName = "test1" - c.Server = true - c.Tags = map[string]string{"role": "test"} - c.LogLevel = logLevel - c.DevMode = true - c.DataDir = dir - c.BootstrapExpect = 1 - - a := NewAgent(c) - err = a.Start() - require.NoError(t, err) - time.Sleep(2 * time.Second) - - // Test error with no job - _, err = a.RunQuery("foo", &Execution{}) - assert.True(t, errors.Is(err, buntdb.ErrNotFound)) - - j1 := &Job{ - Name: "test_job", - Schedule: "@daily", - } - err = a.Store.SetJob(j1, false) - require.NoError(t, err) - - a.sched.Start([]*Job{j1}, a) - - _, err = a.RunQuery("test_job", &Execution{}) - assert.NoError(t, err) - - a.Stop() -} diff --git a/dkron/run.go b/dkron/run.go new file mode 100644 index 000000000..ddd22752e --- /dev/null +++ b/dkron/run.go @@ -0,0 +1,66 @@ +package dkron + +import ( + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +// Run call the agents to run a job. Returns a job with it's new status and next schedule. +func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { + job, err := a.Store.GetJob(jobName, nil) + if err != nil { + return nil, fmt.Errorf("agent: Run error retrieving job: %s from store: %w", jobName, err) + } + + // In case the job is not a child job, compute the next execution time + if job.ParentJob == "" { + if e, ok := a.sched.GetEntry(jobName); ok { + job.Next = e.Next + if err := a.applySetJob(job.ToProto()); err != nil { + return nil, fmt.Errorf("agent: Run error storing job %s before running: %w", jobName, err) + } + } else { + return nil, fmt.Errorf("agent: Run error retrieving job: %s from scheduler", jobName) + } + } + + // In the first execution attempt we build and filter the target nodes + // but we use the existing node target in case of retry. + var filterMap map[string]string + if ex.Attempt <= 1 { + filterMap, _, err = a.processFilteredNodes(job) + if err != nil { + return nil, fmt.Errorf("agent: Run error processing filtered nodes: %w", err) + } + } else { + filterMap = map[string]string{ex.NodeName: ""} + } + + log.WithField("nodes", filterMap).Debug("agent: Filtered nodes to run") + + var wg sync.WaitGroup + for _, v := range filterMap { + // Call here client GRPC AgentRun + wg.Add(1) + go func(node string, wg *sync.WaitGroup) { + defer wg.Done() + log.WithFields(logrus.Fields{ + "job_name": job.Name, + "node": node, + }).Info("agent: Calling AgentRun") + + err := a.GRPCClient.AgentRun(node, job.ToProto(), ex.ToProto()) + if err != nil { + log.WithFields(logrus.Fields{ + "job_name": job.Name, + "node": node, + }).Error("agent: Error calling AgentRun") + } + }(v, &wg) + } + + wg.Wait() + return job, nil +} diff --git a/go.mod b/go.mod index d9ad68b8c..d456e69f6 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/hashicorp/serf v0.9.0 github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect github.com/jordan-wright/email v0.0.0-20180115032944-94ae17dedda2 + github.com/juliangruber/go-intersect v1.0.0 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/mattn/go-shellwords v1.0.10 github.com/mitchellh/go-testing-interface v1.0.0 // indirect @@ -40,6 +41,7 @@ require ( golang.org/x/net v0.0.0-20200301022130-244492dfa37a golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect google.golang.org/grpc v1.28.1 + google.golang.org/protobuf v1.21.0 ) go 1.13 diff --git a/go.sum b/go.sum index 2a9942d1d..52a5c6fc9 100644 --- a/go.sum +++ b/go.sum @@ -233,6 +233,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juliangruber/go-intersect v1.0.0 h1:0XNPNaEoPd7PZljVNZLk4qrRkR153Sjk2ZL1426zFQ0= +github.com/juliangruber/go-intersect v1.0.0/go.mod h1:unIef4vysSJvZ6adJAAPiBVKpS4r/IOkmfuFghRFDDM= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= diff --git a/plugin/types/dkron.pb.go b/plugin/types/dkron.pb.go index 0fc56c57b..1927c987f 100644 --- a/plugin/types/dkron.pb.go +++ b/plugin/types/dkron.pb.go @@ -1180,6 +1180,53 @@ func (m *GetActiveExecutionsResponse) GetExecutions() []*Execution { return nil } +type AgentRunRequest struct { + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Execution *Execution `protobuf:"bytes,2,opt,name=execution,proto3" json:"execution,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AgentRunRequest) Reset() { *m = AgentRunRequest{} } +func (m *AgentRunRequest) String() string { return proto.CompactTextString(m) } +func (*AgentRunRequest) ProtoMessage() {} +func (*AgentRunRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1f0292872e9433f8, []int{21} +} + +func (m *AgentRunRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AgentRunRequest.Unmarshal(m, b) +} +func (m *AgentRunRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AgentRunRequest.Marshal(b, m, deterministic) +} +func (m *AgentRunRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AgentRunRequest.Merge(m, src) +} +func (m *AgentRunRequest) XXX_Size() int { + return xxx_messageInfo_AgentRunRequest.Size(m) +} +func (m *AgentRunRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AgentRunRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AgentRunRequest proto.InternalMessageInfo + +func (m *AgentRunRequest) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +func (m *AgentRunRequest) GetExecution() *Execution { + if m != nil { + return m.Execution + } + return nil +} + func init() { proto.RegisterType((*Job)(nil), "types.Job") proto.RegisterMapType((map[string]string)(nil), "types.Job.ExecutorConfigEntry") @@ -1208,6 +1255,7 @@ func init() { proto.RegisterType((*AgentRunStream)(nil), "types.AgentRunStream") proto.RegisterType((*AgentRunResponse)(nil), "types.AgentRunResponse") proto.RegisterType((*GetActiveExecutionsResponse)(nil), "types.GetActiveExecutionsResponse") + proto.RegisterType((*AgentRunRequest)(nil), "types.AgentRunRequest") } func init() { @@ -1215,89 +1263,91 @@ func init() { } var fileDescriptor_1f0292872e9433f8 = []byte{ - // 1307 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x72, 0xdb, 0x36, - 0x10, 0x1d, 0x5a, 0x96, 0x2c, 0xae, 0x2e, 0x71, 0x60, 0x27, 0x61, 0xe8, 0xb4, 0xd1, 0x28, 0xd3, - 0x19, 0xb5, 0x99, 0x28, 0xad, 0xdb, 0x34, 0xb7, 0x4e, 0x26, 0x6e, 0xec, 0x7a, 0xea, 0x69, 0x53, - 0x97, 0xf2, 0xf4, 0xa5, 0x0f, 0x1a, 0x48, 0x5c, 0xcb, 0x4c, 0x28, 0x42, 0x05, 0x40, 0x37, 0xea, - 0x63, 0xff, 0xa3, 0x0f, 0xfd, 0x88, 0xfe, 0x5f, 0x07, 0x00, 0x41, 0x53, 0xb2, 0xe4, 0x4b, 0xde, - 0xb8, 0x8b, 0x83, 0xc5, 0xee, 0xe2, 0x9c, 0x25, 0xa0, 0x16, 0xbe, 0xe7, 0x2c, 0xe9, 0x4e, 0x38, - 0x93, 0x8c, 0x94, 0xe5, 0x74, 0x82, 0xc2, 0xbf, 0x3f, 0x62, 0x6c, 0x14, 0xe3, 0x63, 0xed, 0x1c, - 0xa4, 0xc7, 0x8f, 0x65, 0x34, 0x46, 0x21, 0xe9, 0x78, 0x62, 0x70, 0xfe, 0xd6, 0x3c, 0x00, 0xc7, - 0x13, 0x39, 0x35, 0x8b, 0xed, 0xff, 0x5c, 0x28, 0x1d, 0xb0, 0x01, 0x21, 0xb0, 0x9a, 0xd0, 0x31, - 0x7a, 0x4e, 0xcb, 0xe9, 0xb8, 0x81, 0xfe, 0x26, 0x3e, 0x54, 0x55, 0xac, 0xbf, 0x58, 0x82, 0xde, - 0x8a, 0xf6, 0xe7, 0xb6, 0x5a, 0x13, 0xc3, 0x13, 0x0c, 0xd3, 0x18, 0xbd, 0x92, 0x59, 0xb3, 0x36, - 0xd9, 0x84, 0x32, 0xfb, 0x33, 0x41, 0xee, 0xad, 0xe9, 0x05, 0x63, 0x90, 0xfb, 0x50, 0xd3, 0x1f, - 0x7d, 0x1c, 0xd3, 0x28, 0xf6, 0xaa, 0x7a, 0x0d, 0xb4, 0x6b, 0x4f, 0x79, 0xc8, 0x03, 0x68, 0x88, - 0x74, 0x38, 0x44, 0x21, 0xfa, 0x43, 0x96, 0x26, 0xd2, 0x73, 0x5b, 0x4e, 0xa7, 0x1c, 0xd4, 0x33, - 0xe7, 0x1b, 0xe5, 0x53, 0x51, 0x90, 0x73, 0xc6, 0x33, 0x08, 0x68, 0x08, 0x68, 0x97, 0x01, 0xf8, - 0x50, 0x0d, 0x23, 0x41, 0x07, 0x31, 0x86, 0x5e, 0xad, 0xe5, 0x74, 0xaa, 0x41, 0x6e, 0x93, 0x0e, - 0xac, 0x4a, 0x3a, 0x12, 0x5e, 0xbd, 0x55, 0xea, 0xd4, 0xb6, 0x37, 0xbb, 0xba, 0x81, 0xdd, 0x03, - 0x36, 0xe8, 0x1e, 0xd1, 0x91, 0xd8, 0x4b, 0x24, 0x9f, 0x06, 0x1a, 0x41, 0x3c, 0x58, 0xe3, 0x28, - 0x79, 0x84, 0xc2, 0x6b, 0xb4, 0x9c, 0x4e, 0x23, 0xb0, 0x26, 0xf9, 0x0c, 0x9a, 0x21, 0x4e, 0x30, - 0x09, 0x31, 0x91, 0xfd, 0x77, 0x6c, 0x20, 0xbc, 0x66, 0xab, 0xd4, 0x71, 0x83, 0x46, 0xee, 0x3d, - 0x60, 0x03, 0x41, 0x3e, 0x01, 0x98, 0x50, 0x9e, 0x61, 0xbc, 0x1b, 0xba, 0x58, 0xd7, 0x78, 0x54, - 0xbb, 0x5b, 0x50, 0x1b, 0xb2, 0x64, 0x98, 0x72, 0x8e, 0xc9, 0x70, 0xea, 0xad, 0xeb, 0xf5, 0xa2, - 0x4b, 0xd5, 0x81, 0x1f, 0x70, 0x98, 0x4a, 0xc6, 0xbd, 0x9b, 0xa6, 0xc1, 0xd6, 0x26, 0xfb, 0x70, - 0xc3, 0x7e, 0xf7, 0x87, 0x2c, 0x39, 0x8e, 0x46, 0x1e, 0xd1, 0x25, 0x7d, 0x5a, 0x28, 0x69, 0x2f, - 0x43, 0xbc, 0xd1, 0x00, 0x53, 0x5c, 0x13, 0x67, 0x9c, 0xe4, 0x36, 0x54, 0x84, 0xa4, 0x32, 0x15, - 0xde, 0x86, 0x3e, 0x22, 0xb3, 0xc8, 0x37, 0x50, 0x1d, 0xa3, 0xa4, 0x21, 0x95, 0xd4, 0xdb, 0xd4, - 0x91, 0xbd, 0x42, 0xe4, 0x9f, 0xb3, 0x25, 0x13, 0x33, 0x47, 0x92, 0x17, 0x50, 0x8f, 0xa9, 0x90, - 0xfd, 0xec, 0xc2, 0xbc, 0xbb, 0x2d, 0xa7, 0x53, 0xdb, 0xbe, 0x53, 0xd8, 0xf9, 0x36, 0x8d, 0x63, - 0x75, 0x15, 0x47, 0xd1, 0x18, 0x83, 0x9a, 0x02, 0xf7, 0x0c, 0x96, 0x7c, 0x0b, 0xa0, 0xf7, 0xea, - 0x9b, 0xf4, 0xfc, 0x8b, 0x77, 0xba, 0x0a, 0xba, 0xa7, 0x90, 0xa4, 0x0b, 0xab, 0x09, 0x7e, 0x90, - 0xde, 0x1d, 0xbd, 0xc3, 0xef, 0x1a, 0xae, 0x77, 0x2d, 0xd7, 0xbb, 0x47, 0x56, 0x0c, 0x81, 0xc6, - 0xa9, 0xc6, 0x87, 0x91, 0x98, 0xc4, 0x74, 0xaa, 0xe9, 0xee, 0x99, 0xc6, 0x17, 0x5c, 0xe4, 0x05, - 0xc0, 0x84, 0x33, 0x95, 0x14, 0xe3, 0xc2, 0xdb, 0xd2, 0xd5, 0xfb, 0x85, 0x4c, 0x0e, 0xf3, 0x45, - 0x53, 0x7f, 0x01, 0xed, 0x3f, 0x05, 0x37, 0x67, 0x12, 0x59, 0x87, 0xd2, 0x7b, 0x9c, 0x66, 0x8a, - 0x52, 0x9f, 0x4a, 0x18, 0xa7, 0x34, 0x4e, 0xad, 0x9a, 0x8c, 0xf1, 0x62, 0xe5, 0x99, 0xe3, 0xef, - 0xc0, 0xc6, 0x82, 0xfb, 0xba, 0x56, 0x88, 0x97, 0xd0, 0x98, 0xb9, 0x98, 0x6b, 0x6d, 0xfe, 0x1d, - 0xea, 0xc5, 0x0e, 0x93, 0x2d, 0x70, 0x4f, 0xa8, 0xe8, 0x1b, 0xb4, 0x63, 0x64, 0x74, 0x42, 0xc5, - 0x6f, 0xca, 0x56, 0x3d, 0x57, 0x73, 0x40, 0x47, 0xb9, 0xa4, 0xe7, 0x0a, 0xe7, 0x07, 0x70, 0x63, - 0xae, 0x69, 0x0b, 0x72, 0xfb, 0xbc, 0x98, 0x5b, 0x6d, 0x7b, 0x23, 0xeb, 0xf8, 0x61, 0x9c, 0x8e, - 0xa2, 0xc4, 0xf4, 0xa4, 0x90, 0x70, 0xfb, 0x6f, 0x07, 0xea, 0xc5, 0x35, 0xf2, 0x14, 0x2a, 0x99, - 0x14, 0x1c, 0x7d, 0x65, 0xf7, 0x17, 0x04, 0xe8, 0x16, 0xb5, 0x90, 0xc1, 0xfd, 0xe7, 0x50, 0xfb, - 0xc8, 0x96, 0xb7, 0x1f, 0x41, 0xa3, 0x87, 0x4a, 0xcf, 0x01, 0xfe, 0x91, 0xa2, 0x90, 0xe4, 0x1e, - 0x94, 0x94, 0xdc, 0x1d, 0x5d, 0x02, 0x9c, 0x91, 0x26, 0x50, 0xee, 0x76, 0x17, 0x9a, 0x16, 0x2e, - 0x26, 0x2c, 0x11, 0x78, 0x09, 0xfe, 0x11, 0xac, 0xef, 0x62, 0x8c, 0x12, 0x0b, 0x27, 0xdc, 0x85, - 0xea, 0x3b, 0x36, 0xe8, 0x17, 0x66, 0xf5, 0xda, 0x3b, 0x36, 0x78, 0x4b, 0xc7, 0xd8, 0xfe, 0x0a, - 0x6e, 0x16, 0xe0, 0x57, 0x3a, 0xe1, 0x0b, 0x68, 0xec, 0xcf, 0x14, 0x70, 0x41, 0xf8, 0x2e, 0x34, - 0xf7, 0xaf, 0x93, 0xfd, 0x3f, 0x2b, 0xe0, 0x1a, 0x4e, 0x47, 0x2c, 0xb9, 0x20, 0xb0, 0x9a, 0xb5, - 0x76, 0x62, 0xac, 0x68, 0xa6, 0x59, 0x53, 0x8d, 0x27, 0x96, 0xca, 0x49, 0x2a, 0xf5, 0x2f, 0xa6, - 0x1e, 0x64, 0x96, 0x62, 0x67, 0xc2, 0x42, 0x34, 0xd1, 0x56, 0xcd, 0x70, 0x54, 0x0e, 0x1d, 0x6e, - 0x13, 0xca, 0x23, 0xce, 0xd2, 0x89, 0x57, 0x6e, 0x39, 0x9d, 0x52, 0x60, 0x0c, 0x75, 0x08, 0x95, - 0x52, 0xfd, 0xf9, 0xbc, 0x8a, 0x19, 0xe8, 0x99, 0x49, 0x9e, 0x03, 0x08, 0x49, 0xb9, 0xc4, 0xb0, - 0x4f, 0xa5, 0xfe, 0x65, 0x5d, 0xcc, 0x69, 0x37, 0x43, 0xef, 0x48, 0xf2, 0x12, 0x6a, 0xc7, 0x51, - 0x12, 0x89, 0x13, 0xb3, 0xb7, 0x7a, 0xe9, 0x5e, 0xb0, 0xf0, 0x1d, 0xd9, 0xfe, 0x01, 0x36, 0xf3, - 0xf6, 0xec, 0xb2, 0x04, 0xed, 0x15, 0x74, 0xc1, 0x45, 0xeb, 0xcf, 0x7a, 0xbb, 0x9e, 0xf5, 0x36, - 0xc7, 0x07, 0x67, 0x90, 0xf6, 0x1e, 0xdc, 0x9a, 0x8b, 0x93, 0x5d, 0x0f, 0x81, 0xd5, 0x63, 0xce, - 0xc6, 0xf6, 0x97, 0xae, 0xbe, 0x55, 0x1b, 0x26, 0x74, 0x1a, 0x33, 0x1a, 0xea, 0x5e, 0xd7, 0x03, - 0x6b, 0x2a, 0x2a, 0x04, 0x69, 0x72, 0x65, 0x2a, 0x58, 0xec, 0x55, 0x89, 0x7c, 0xc4, 0x46, 0xa3, - 0xf8, 0xea, 0x44, 0x2e, 0xc0, 0xaf, 0x46, 0x36, 0x07, 0x20, 0xa0, 0xc7, 0xb2, 0x87, 0xfc, 0x14, - 0x39, 0x69, 0xc2, 0x4a, 0x14, 0x66, 0x61, 0x57, 0xa2, 0x50, 0xbf, 0x6e, 0x58, 0x68, 0x15, 0xac, - 0xbf, 0x35, 0x23, 0xc2, 0x90, 0x2b, 0xda, 0x99, 0x07, 0x8c, 0x35, 0x15, 0xed, 0x62, 0xa4, 0x21, - 0x72, 0xcd, 0xad, 0x6a, 0x90, 0x59, 0x7a, 0x10, 0x30, 0x89, 0x5c, 0x33, 0xab, 0x1a, 0x18, 0x43, - 0x3d, 0x5b, 0x38, 0x3d, 0x96, 0x7d, 0x7d, 0xdd, 0x43, 0x16, 0x6b, 0x7e, 0xb9, 0x41, 0x5d, 0x39, - 0x0f, 0x33, 0x5f, 0x9b, 0xc2, 0x3d, 0x95, 0xde, 0x3e, 0x4a, 0x33, 0x6b, 0x52, 0x4e, 0xf5, 0x3d, - 0xda, 0xea, 0x1e, 0xc2, 0x9a, 0xd0, 0xa9, 0x8b, 0x6c, 0x7c, 0xdd, 0xcc, 0x2a, 0x3c, 0x2b, 0x2a, - 0xb0, 0x08, 0x95, 0x47, 0x94, 0x84, 0xf8, 0x41, 0x97, 0xb3, 0x1a, 0x18, 0xa3, 0xfd, 0x10, 0xee, - 0x2a, 0x70, 0x80, 0x63, 0x76, 0x8a, 0x87, 0x88, 0xfc, 0xfb, 0xe9, 0x8f, 0xbb, 0xb6, 0xdb, 0x73, - 0x0d, 0x69, 0xbf, 0x86, 0xe6, 0xce, 0x08, 0x13, 0x19, 0xa4, 0x49, 0x4f, 0x72, 0xa4, 0xe3, 0x6b, - 0xd3, 0xee, 0x35, 0xac, 0xdb, 0x08, 0x1f, 0xc9, 0xb8, 0x5f, 0x60, 0x6b, 0x1f, 0xe5, 0xce, 0x50, - 0x46, 0xa7, 0x98, 0x1f, 0x21, 0xf2, 0x60, 0x5f, 0x02, 0xe4, 0xa7, 0xd9, 0xae, 0x9c, 0xcf, 0xa8, - 0x80, 0xd9, 0xfe, 0xb7, 0x02, 0xe5, 0x5d, 0xf5, 0x40, 0x26, 0x4f, 0xa0, 0x62, 0x66, 0x15, 0xb1, - 0x8f, 0xbc, 0x99, 0x31, 0xe7, 0xdf, 0x9a, 0xf3, 0x66, 0x47, 0x1e, 0x40, 0x63, 0x46, 0x4a, 0x64, - 0x6b, 0xfe, 0xbc, 0x82, 0x50, 0xfd, 0x7b, 0x8b, 0x17, 0xb3, 0x58, 0x4f, 0xa1, 0xfc, 0x13, 0xd2, - 0x53, 0x24, 0xb7, 0xcf, 0xcd, 0x83, 0x3d, 0xf5, 0xfe, 0xf6, 0x97, 0xf8, 0x55, 0xee, 0xbd, 0xd9, - 0xdc, 0x7b, 0x0b, 0x73, 0x9f, 0xfb, 0x95, 0xbc, 0x02, 0x37, 0x9f, 0xfe, 0xc4, 0xbe, 0x9c, 0xe6, - 0x7f, 0x1f, 0xbe, 0x77, 0x7e, 0x21, 0xdb, 0xff, 0x04, 0x2a, 0x46, 0xd3, 0xf9, 0xb1, 0x33, 0xe3, - 0x20, 0x3f, 0x76, 0x4e, 0xf8, 0xaf, 0xc0, 0xcd, 0xb5, 0x9a, 0x1f, 0x3b, 0x2f, 0xf6, 0xfc, 0xd8, - 0xf3, 0xb2, 0xee, 0xc1, 0xe6, 0x22, 0x61, 0x2c, 0xed, 0xda, 0x83, 0x82, 0x2e, 0x96, 0xaa, 0xe9, - 0x2d, 0x90, 0xf3, 0x52, 0x20, 0xad, 0xc2, 0xd6, 0x85, 0x2a, 0x59, 0x7a, 0x25, 0xdf, 0x41, 0xd5, - 0x72, 0x9d, 0xd8, 0x3e, 0xcc, 0xca, 0xc7, 0xbf, 0x33, 0xe7, 0xb6, 0xb9, 0x74, 0x1c, 0xf2, 0x2b, - 0x6c, 0x2c, 0xe0, 0xf9, 0xd2, 0x0a, 0xdb, 0x67, 0xdc, 0x5c, 0xaa, 0x8d, 0x67, 0x50, 0xef, 0xa1, - 0x3c, 0xfb, 0xbb, 0x9e, 0xd3, 0xc5, 0xb2, 0x52, 0x06, 0x15, 0x6d, 0x7f, 0xfd, 0x7f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x8a, 0xab, 0x22, 0xc1, 0x4a, 0x0e, 0x00, 0x00, + // 1330 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x6e, 0x1b, 0x37, + 0x13, 0xc6, 0x4a, 0x96, 0xad, 0x1d, 0x1d, 0xec, 0xd0, 0x4e, 0xb2, 0x59, 0xe7, 0xff, 0x23, 0x28, + 0xf8, 0x01, 0xfd, 0x0d, 0xa2, 0xa4, 0x6e, 0xd3, 0x9c, 0x80, 0x20, 0x6e, 0xec, 0x1a, 0x35, 0xda, + 0xd4, 0x5d, 0x19, 0xbd, 0xe9, 0x85, 0x40, 0x69, 0xc7, 0xf2, 0x26, 0xab, 0xa5, 0x4a, 0x72, 0xdd, + 0xa8, 0x97, 0x7d, 0x8f, 0x3e, 0x46, 0x9f, 0xa8, 0x2f, 0x52, 0x90, 0x5c, 0xae, 0x57, 0xb2, 0xe4, + 0x43, 0xee, 0x38, 0xc3, 0x8f, 0xc3, 0x39, 0x7c, 0x33, 0x24, 0xd4, 0xc2, 0x8f, 0x9c, 0x25, 0xdd, + 0x09, 0x67, 0x92, 0x91, 0x8a, 0x9c, 0x4e, 0x50, 0xf8, 0x0f, 0x46, 0x8c, 0x8d, 0x62, 0x7c, 0xa2, + 0x95, 0x83, 0xf4, 0xe4, 0x89, 0x8c, 0xc6, 0x28, 0x24, 0x1d, 0x4f, 0x0c, 0xce, 0xdf, 0x9e, 0x07, + 0xe0, 0x78, 0x22, 0xa7, 0x66, 0xb3, 0xfd, 0xb7, 0x0b, 0xe5, 0x43, 0x36, 0x20, 0x04, 0x56, 0x12, + 0x3a, 0x46, 0xcf, 0x69, 0x39, 0x1d, 0x37, 0xd0, 0x6b, 0xe2, 0x43, 0x55, 0xd9, 0xfa, 0x83, 0x25, + 0xe8, 0x95, 0xb4, 0x3e, 0x97, 0xd5, 0x9e, 0x18, 0x9e, 0x62, 0x98, 0xc6, 0xe8, 0x95, 0xcd, 0x9e, + 0x95, 0xc9, 0x16, 0x54, 0xd8, 0xef, 0x09, 0x72, 0x6f, 0x4d, 0x6f, 0x18, 0x81, 0x3c, 0x80, 0x9a, + 0x5e, 0xf4, 0x71, 0x4c, 0xa3, 0xd8, 0xab, 0xea, 0x3d, 0xd0, 0xaa, 0x7d, 0xa5, 0x21, 0x0f, 0xa1, + 0x21, 0xd2, 0xe1, 0x10, 0x85, 0xe8, 0x0f, 0x59, 0x9a, 0x48, 0xcf, 0x6d, 0x39, 0x9d, 0x4a, 0x50, + 0xcf, 0x94, 0xef, 0x94, 0x4e, 0x59, 0x41, 0xce, 0x19, 0xcf, 0x20, 0xa0, 0x21, 0xa0, 0x55, 0x06, + 0xe0, 0x43, 0x35, 0x8c, 0x04, 0x1d, 0xc4, 0x18, 0x7a, 0xb5, 0x96, 0xd3, 0xa9, 0x06, 0xb9, 0x4c, + 0x3a, 0xb0, 0x22, 0xe9, 0x48, 0x78, 0xf5, 0x56, 0xb9, 0x53, 0xdb, 0xd9, 0xea, 0xea, 0x04, 0x76, + 0x0f, 0xd9, 0xa0, 0x7b, 0x4c, 0x47, 0x62, 0x3f, 0x91, 0x7c, 0x1a, 0x68, 0x04, 0xf1, 0x60, 0x8d, + 0xa3, 0xe4, 0x11, 0x0a, 0xaf, 0xd1, 0x72, 0x3a, 0x8d, 0xc0, 0x8a, 0xe4, 0x7f, 0xd0, 0x0c, 0x71, + 0x82, 0x49, 0x88, 0x89, 0xec, 0x7f, 0x60, 0x03, 0xe1, 0x35, 0x5b, 0xe5, 0x8e, 0x1b, 0x34, 0x72, + 0xed, 0x21, 0x1b, 0x08, 0xf2, 0x1f, 0x80, 0x09, 0xe5, 0x19, 0xc6, 0x5b, 0xd7, 0xc1, 0xba, 0x46, + 0xa3, 0xd2, 0xdd, 0x82, 0xda, 0x90, 0x25, 0xc3, 0x94, 0x73, 0x4c, 0x86, 0x53, 0x6f, 0x43, 0xef, + 0x17, 0x55, 0x2a, 0x0e, 0xfc, 0x84, 0xc3, 0x54, 0x32, 0xee, 0xdd, 0x32, 0x09, 0xb6, 0x32, 0x39, + 0x80, 0x75, 0xbb, 0xee, 0x0f, 0x59, 0x72, 0x12, 0x8d, 0x3c, 0xa2, 0x43, 0xfa, 0x6f, 0x21, 0xa4, + 0xfd, 0x0c, 0xf1, 0x4e, 0x03, 0x4c, 0x70, 0x4d, 0x9c, 0x51, 0x92, 0x3b, 0xb0, 0x2a, 0x24, 0x95, + 0xa9, 0xf0, 0x36, 0xf5, 0x15, 0x99, 0x44, 0xbe, 0x86, 0xea, 0x18, 0x25, 0x0d, 0xa9, 0xa4, 0xde, + 0x96, 0xb6, 0xec, 0x15, 0x2c, 0xff, 0x98, 0x6d, 0x19, 0x9b, 0x39, 0x92, 0xbc, 0x82, 0x7a, 0x4c, + 0x85, 0xec, 0x67, 0x05, 0xf3, 0xee, 0xb5, 0x9c, 0x4e, 0x6d, 0xe7, 0x6e, 0xe1, 0xe4, 0xfb, 0x34, + 0x8e, 0x55, 0x29, 0x8e, 0xa3, 0x31, 0x06, 0x35, 0x05, 0xee, 0x19, 0x2c, 0xf9, 0x06, 0x40, 0x9f, + 0xd5, 0x95, 0xf4, 0xfc, 0xcb, 0x4f, 0xba, 0x0a, 0xba, 0xaf, 0x90, 0xa4, 0x0b, 0x2b, 0x09, 0x7e, + 0x92, 0xde, 0x5d, 0x7d, 0xc2, 0xef, 0x1a, 0xae, 0x77, 0x2d, 0xd7, 0xbb, 0xc7, 0xb6, 0x19, 0x02, + 0x8d, 0x53, 0x89, 0x0f, 0x23, 0x31, 0x89, 0xe9, 0x54, 0xd3, 0xdd, 0x33, 0x89, 0x2f, 0xa8, 0xc8, + 0x2b, 0x80, 0x09, 0x67, 0xca, 0x29, 0xc6, 0x85, 0xb7, 0xad, 0xa3, 0xf7, 0x0b, 0x9e, 0x1c, 0xe5, + 0x9b, 0x26, 0xfe, 0x02, 0xda, 0x7f, 0x0e, 0x6e, 0xce, 0x24, 0xb2, 0x01, 0xe5, 0x8f, 0x38, 0xcd, + 0x3a, 0x4a, 0x2d, 0x55, 0x63, 0x9c, 0xd1, 0x38, 0xb5, 0xdd, 0x64, 0x84, 0x57, 0xa5, 0x17, 0x8e, + 0xbf, 0x0b, 0x9b, 0x0b, 0xea, 0x75, 0x23, 0x13, 0xaf, 0xa1, 0x31, 0x53, 0x98, 0x1b, 0x1d, 0xfe, + 0x15, 0xea, 0xc5, 0x0c, 0x93, 0x6d, 0x70, 0x4f, 0xa9, 0xe8, 0x1b, 0xb4, 0x63, 0xda, 0xe8, 0x94, + 0x8a, 0x5f, 0x94, 0xac, 0x72, 0xae, 0xe6, 0x80, 0xb6, 0x72, 0x45, 0xce, 0x15, 0xce, 0x0f, 0x60, + 0x7d, 0x2e, 0x69, 0x0b, 0x7c, 0xfb, 0x7f, 0xd1, 0xb7, 0xda, 0xce, 0x66, 0x96, 0xf1, 0xa3, 0x38, + 0x1d, 0x45, 0x89, 0xc9, 0x49, 0xc1, 0xe1, 0xf6, 0x9f, 0x0e, 0xd4, 0x8b, 0x7b, 0xe4, 0x39, 0xac, + 0x66, 0xad, 0xe0, 0xe8, 0x92, 0x3d, 0x58, 0x60, 0xa0, 0x5b, 0xec, 0x85, 0x0c, 0xee, 0xbf, 0x84, + 0xda, 0x67, 0xa6, 0xbc, 0xfd, 0x18, 0x1a, 0x3d, 0x54, 0xfd, 0x1c, 0xe0, 0x6f, 0x29, 0x0a, 0x49, + 0xee, 0x43, 0x59, 0xb5, 0xbb, 0xa3, 0x43, 0x80, 0x73, 0xd2, 0x04, 0x4a, 0xdd, 0xee, 0x42, 0xd3, + 0xc2, 0xc5, 0x84, 0x25, 0x02, 0xaf, 0xc0, 0x3f, 0x86, 0x8d, 0x3d, 0x8c, 0x51, 0x62, 0xe1, 0x86, + 0x7b, 0x50, 0xfd, 0xc0, 0x06, 0xfd, 0xc2, 0xac, 0x5e, 0xfb, 0xc0, 0x06, 0xef, 0xe9, 0x18, 0xdb, + 0x5f, 0xc2, 0xad, 0x02, 0xfc, 0x5a, 0x37, 0x7c, 0x01, 0x8d, 0x83, 0x99, 0x00, 0x2e, 0x31, 0xdf, + 0x85, 0xe6, 0xc1, 0x4d, 0xbc, 0xff, 0xab, 0x04, 0xae, 0xe1, 0x74, 0xc4, 0x92, 0x4b, 0x0c, 0xab, + 0x59, 0x6b, 0x27, 0x46, 0x49, 0x33, 0xcd, 0x8a, 0x6a, 0x3c, 0xb1, 0x54, 0x4e, 0x52, 0xa9, 0x9f, + 0x98, 0x7a, 0x90, 0x49, 0x8a, 0x9d, 0x09, 0x0b, 0xd1, 0x58, 0x5b, 0x31, 0xc3, 0x51, 0x29, 0xb4, + 0xb9, 0x2d, 0xa8, 0x8c, 0x38, 0x4b, 0x27, 0x5e, 0xa5, 0xe5, 0x74, 0xca, 0x81, 0x11, 0xd4, 0x25, + 0x54, 0x4a, 0xf5, 0xf2, 0x79, 0xab, 0x66, 0xa0, 0x67, 0x22, 0x79, 0x09, 0x20, 0x24, 0xe5, 0x12, + 0xc3, 0x3e, 0x95, 0xfa, 0xc9, 0xba, 0x9c, 0xd3, 0x6e, 0x86, 0xde, 0x95, 0xe4, 0x35, 0xd4, 0x4e, + 0xa2, 0x24, 0x12, 0xa7, 0xe6, 0x6c, 0xf5, 0xca, 0xb3, 0x60, 0xe1, 0xbb, 0xb2, 0xfd, 0x1d, 0x6c, + 0xe5, 0xe9, 0xd9, 0x63, 0x09, 0xda, 0x12, 0x74, 0xc1, 0x45, 0xab, 0xcf, 0x72, 0xbb, 0x91, 0xe5, + 0x36, 0xc7, 0x07, 0xe7, 0x90, 0xf6, 0x3e, 0xdc, 0x9e, 0xb3, 0x93, 0x95, 0x87, 0xc0, 0xca, 0x09, + 0x67, 0x63, 0xfb, 0xa4, 0xab, 0xb5, 0x4a, 0xc3, 0x84, 0x4e, 0x63, 0x46, 0x43, 0x9d, 0xeb, 0x7a, + 0x60, 0x45, 0x45, 0x85, 0x20, 0x4d, 0xae, 0x4d, 0x05, 0x8b, 0xbd, 0x2e, 0x91, 0x8f, 0xd9, 0x68, + 0x14, 0x5f, 0x9f, 0xc8, 0x05, 0xf8, 0xf5, 0xc8, 0xe6, 0x00, 0x04, 0xf4, 0x44, 0xf6, 0x90, 0x9f, + 0x21, 0x27, 0x4d, 0x28, 0x45, 0x61, 0x66, 0xb6, 0x14, 0x85, 0xfa, 0x77, 0xc3, 0x42, 0xdb, 0xc1, + 0x7a, 0xad, 0x19, 0x11, 0x86, 0x5c, 0xd1, 0xce, 0x7c, 0x60, 0xac, 0xa8, 0x68, 0x17, 0x23, 0x0d, + 0x91, 0x6b, 0x6e, 0x55, 0x83, 0x4c, 0xd2, 0x83, 0x80, 0x49, 0xe4, 0x9a, 0x59, 0xd5, 0xc0, 0x08, + 0xea, 0xdb, 0xc2, 0xe9, 0x89, 0xec, 0xeb, 0x72, 0x0f, 0x59, 0xac, 0xf9, 0xe5, 0x06, 0x75, 0xa5, + 0x3c, 0xca, 0x74, 0x6d, 0x0a, 0xf7, 0x95, 0x7b, 0x07, 0x28, 0xcd, 0xac, 0x49, 0x39, 0xd5, 0x75, + 0xb4, 0xd1, 0x3d, 0x82, 0x35, 0xa1, 0x5d, 0x17, 0xd9, 0xf8, 0xba, 0x95, 0x45, 0x78, 0x1e, 0x54, + 0x60, 0x11, 0xca, 0x8f, 0x28, 0x09, 0xf1, 0x93, 0x0e, 0x67, 0x25, 0x30, 0x42, 0xfb, 0x11, 0xdc, + 0x53, 0xe0, 0x00, 0xc7, 0xec, 0x0c, 0x8f, 0x10, 0xf9, 0xb7, 0xd3, 0xef, 0xf7, 0x6c, 0xb6, 0xe7, + 0x12, 0xd2, 0x7e, 0x0b, 0xcd, 0xdd, 0x11, 0x26, 0x32, 0x48, 0x93, 0x9e, 0xe4, 0x48, 0xc7, 0x37, + 0xa6, 0xdd, 0x5b, 0xd8, 0xb0, 0x16, 0x3e, 0x93, 0x71, 0x3f, 0xc1, 0xf6, 0x01, 0xca, 0xdd, 0xa1, + 0x8c, 0xce, 0x30, 0xbf, 0x42, 0xe4, 0xc6, 0x9e, 0x02, 0xe4, 0xb7, 0xd9, 0xac, 0x5c, 0xf4, 0xa8, + 0x80, 0x69, 0xf7, 0x61, 0xfd, 0xdc, 0xa5, 0x6b, 0x0c, 0xe4, 0xd9, 0x98, 0x4b, 0x57, 0xc6, 0xbc, + 0xf3, 0x4f, 0x05, 0x2a, 0x7b, 0xea, 0x07, 0x4e, 0x9e, 0xc1, 0xaa, 0x19, 0x86, 0xc4, 0xfe, 0x22, + 0x67, 0xe6, 0xa8, 0x7f, 0x7b, 0x4e, 0x9b, 0xc5, 0x74, 0x08, 0x8d, 0x99, 0x5e, 0x25, 0xdb, 0xf3, + 0xd7, 0x15, 0x26, 0x81, 0x7f, 0x7f, 0xf1, 0x66, 0x66, 0xeb, 0x39, 0x54, 0x7e, 0x40, 0x7a, 0x86, + 0xe4, 0xce, 0x85, 0x81, 0xb3, 0xaf, 0x3e, 0xf8, 0xfe, 0x12, 0xbd, 0xf2, 0xbd, 0x37, 0xeb, 0x7b, + 0x6f, 0xa1, 0xef, 0x73, 0x6f, 0xd5, 0x1b, 0x70, 0xf3, 0xe7, 0x85, 0xd8, 0xaf, 0xd9, 0xfc, 0xfb, + 0xe4, 0x7b, 0x17, 0x37, 0xb2, 0xf3, 0xcf, 0x60, 0xd5, 0x0c, 0x8d, 0xfc, 0xda, 0x99, 0x79, 0x93, + 0x5f, 0x3b, 0x37, 0x59, 0xde, 0x80, 0x9b, 0x0f, 0x83, 0xfc, 0xda, 0xf9, 0x69, 0x92, 0x5f, 0x7b, + 0x71, 0x6e, 0xf4, 0x60, 0x6b, 0x51, 0xe7, 0x2d, 0xcd, 0xda, 0xc3, 0x42, 0xe3, 0x2d, 0x6d, 0xd7, + 0xf7, 0x40, 0x2e, 0xf6, 0x1a, 0x69, 0x15, 0x8e, 0x2e, 0x6c, 0xc3, 0xa5, 0x25, 0xf9, 0x19, 0x36, + 0x17, 0xb4, 0xc2, 0x52, 0x1f, 0xdb, 0xe7, 0xec, 0x5a, 0xda, 0x3e, 0x2f, 0xa0, 0xde, 0x43, 0x79, + 0xfe, 0x00, 0x5f, 0x20, 0xf6, 0x32, 0x67, 0x76, 0xf6, 0xa0, 0xa2, 0xdb, 0x88, 0xbc, 0x86, 0xaa, + 0xed, 0x27, 0x72, 0x27, 0x3b, 0x3e, 0xd7, 0x60, 0x79, 0xd5, 0x66, 0xa7, 0xc9, 0x53, 0x67, 0xb0, + 0xaa, 0xad, 0x7e, 0xf5, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc5, 0xb4, 0x29, 0x03, 0xb3, 0x0e, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1321,7 +1371,6 @@ type DkronClient interface { ToggleJob(ctx context.Context, in *ToggleJobRequest, opts ...grpc.CallOption) (*ToggleJobResponse, error) RaftGetConfiguration(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*RaftGetConfigurationResponse, error) RaftRemovePeerByID(ctx context.Context, in *RaftRemovePeerByIDRequest, opts ...grpc.CallOption) (*empty.Empty, error) - AgentRun(ctx context.Context, opts ...grpc.CallOption) (Dkron_AgentRunClient, error) GetActiveExecutions(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetActiveExecutionsResponse, error) SetExecution(ctx context.Context, in *Execution, opts ...grpc.CallOption) (*empty.Empty, error) } @@ -1415,40 +1464,6 @@ func (c *dkronClient) RaftRemovePeerByID(ctx context.Context, in *RaftRemovePeer return out, nil } -func (c *dkronClient) AgentRun(ctx context.Context, opts ...grpc.CallOption) (Dkron_AgentRunClient, error) { - stream, err := c.cc.NewStream(ctx, &_Dkron_serviceDesc.Streams[0], "/types.Dkron/AgentRun", opts...) - if err != nil { - return nil, err - } - x := &dkronAgentRunClient{stream} - return x, nil -} - -type Dkron_AgentRunClient interface { - Send(*AgentRunStream) error - CloseAndRecv() (*AgentRunResponse, error) - grpc.ClientStream -} - -type dkronAgentRunClient struct { - grpc.ClientStream -} - -func (x *dkronAgentRunClient) Send(m *AgentRunStream) error { - return x.ClientStream.SendMsg(m) -} - -func (x *dkronAgentRunClient) CloseAndRecv() (*AgentRunResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(AgentRunResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func (c *dkronClient) GetActiveExecutions(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetActiveExecutionsResponse, error) { out := new(GetActiveExecutionsResponse) err := c.cc.Invoke(ctx, "/types.Dkron/GetActiveExecutions", in, out, opts...) @@ -1478,7 +1493,6 @@ type DkronServer interface { ToggleJob(context.Context, *ToggleJobRequest) (*ToggleJobResponse, error) RaftGetConfiguration(context.Context, *empty.Empty) (*RaftGetConfigurationResponse, error) RaftRemovePeerByID(context.Context, *RaftRemovePeerByIDRequest) (*empty.Empty, error) - AgentRun(Dkron_AgentRunServer) error GetActiveExecutions(context.Context, *empty.Empty) (*GetActiveExecutionsResponse, error) SetExecution(context.Context, *Execution) (*empty.Empty, error) } @@ -1514,9 +1528,6 @@ func (*UnimplementedDkronServer) RaftGetConfiguration(ctx context.Context, req * func (*UnimplementedDkronServer) RaftRemovePeerByID(ctx context.Context, req *RaftRemovePeerByIDRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method RaftRemovePeerByID not implemented") } -func (*UnimplementedDkronServer) AgentRun(srv Dkron_AgentRunServer) error { - return status.Errorf(codes.Unimplemented, "method AgentRun not implemented") -} func (*UnimplementedDkronServer) GetActiveExecutions(ctx context.Context, req *empty.Empty) (*GetActiveExecutionsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetActiveExecutions not implemented") } @@ -1690,32 +1701,6 @@ func _Dkron_RaftRemovePeerByID_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _Dkron_AgentRun_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(DkronServer).AgentRun(&dkronAgentRunServer{stream}) -} - -type Dkron_AgentRunServer interface { - SendAndClose(*AgentRunResponse) error - Recv() (*AgentRunStream, error) - grpc.ServerStream -} - -type dkronAgentRunServer struct { - grpc.ServerStream -} - -func (x *dkronAgentRunServer) SendAndClose(m *AgentRunResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *dkronAgentRunServer) Recv() (*AgentRunStream, error) { - m := new(AgentRunStream) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func _Dkron_GetActiveExecutions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(empty.Empty) if err := dec(in); err != nil { @@ -1801,11 +1786,104 @@ var _Dkron_serviceDesc = grpc.ServiceDesc{ Handler: _Dkron_SetExecution_Handler, }, }, + Streams: []grpc.StreamDesc{}, + Metadata: "dkron.proto", +} + +// AgentClient is the client API for Agent service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type AgentClient interface { + AgentRun(ctx context.Context, in *AgentRunRequest, opts ...grpc.CallOption) (Agent_AgentRunClient, error) +} + +type agentClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentClient(cc grpc.ClientConnInterface) AgentClient { + return &agentClient{cc} +} + +func (c *agentClient) AgentRun(ctx context.Context, in *AgentRunRequest, opts ...grpc.CallOption) (Agent_AgentRunClient, error) { + stream, err := c.cc.NewStream(ctx, &_Agent_serviceDesc.Streams[0], "/types.Agent/AgentRun", opts...) + if err != nil { + return nil, err + } + x := &agentAgentRunClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Agent_AgentRunClient interface { + Recv() (*AgentRunStream, error) + grpc.ClientStream +} + +type agentAgentRunClient struct { + grpc.ClientStream +} + +func (x *agentAgentRunClient) Recv() (*AgentRunStream, error) { + m := new(AgentRunStream) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AgentServer is the server API for Agent service. +type AgentServer interface { + AgentRun(*AgentRunRequest, Agent_AgentRunServer) error +} + +// UnimplementedAgentServer can be embedded to have forward compatible implementations. +type UnimplementedAgentServer struct { +} + +func (*UnimplementedAgentServer) AgentRun(req *AgentRunRequest, srv Agent_AgentRunServer) error { + return status.Errorf(codes.Unimplemented, "method AgentRun not implemented") +} + +func RegisterAgentServer(s *grpc.Server, srv AgentServer) { + s.RegisterService(&_Agent_serviceDesc, srv) +} + +func _Agent_AgentRun_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(AgentRunRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentServer).AgentRun(m, &agentAgentRunServer{stream}) +} + +type Agent_AgentRunServer interface { + Send(*AgentRunStream) error + grpc.ServerStream +} + +type agentAgentRunServer struct { + grpc.ServerStream +} + +func (x *agentAgentRunServer) Send(m *AgentRunStream) error { + return x.ServerStream.SendMsg(m) +} + +var _Agent_serviceDesc = grpc.ServiceDesc{ + ServiceName: "types.Agent", + HandlerType: (*AgentServer)(nil), + Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "AgentRun", - Handler: _Dkron_AgentRun_Handler, - ClientStreams: true, + Handler: _Agent_AgentRun_Handler, + ServerStreams: true, }, }, Metadata: "dkron.proto", diff --git a/proto/dkron.proto b/proto/dkron.proto index 739d8c55a..8cb0a5542 100644 --- a/proto/dkron.proto +++ b/proto/dkron.proto @@ -140,7 +140,15 @@ service Dkron { rpc ToggleJob (ToggleJobRequest) returns (ToggleJobResponse); rpc RaftGetConfiguration (google.protobuf.Empty) returns (RaftGetConfigurationResponse); rpc RaftRemovePeerByID (RaftRemovePeerByIDRequest) returns (google.protobuf.Empty); - rpc AgentRun (stream AgentRunStream) returns (AgentRunResponse); rpc GetActiveExecutions (google.protobuf.Empty) returns (GetActiveExecutionsResponse); rpc SetExecution (Execution) returns (google.protobuf.Empty); } + +message AgentRunRequest { + Job job = 1; + Execution execution = 2; +} + +service Agent { + rpc AgentRun (AgentRunRequest) returns (stream AgentRunStream); +} diff --git a/scripts/ansible/site.yml b/scripts/ansible/site.yml index 23483a31a..7506e7dff 100644 --- a/scripts/ansible/site.yml +++ b/scripts/ansible/site.yml @@ -65,3 +65,51 @@ name: dkron enabled: yes state: restarted + + ### File descriptor and memory tweaks + + - name: Increase sysctl open files system wide + sysctl: + name: fs.file-max + value: "3243542" + tags: performance + + - name: Increasing number of open files + lineinfile: dest=/etc/systemd/system.conf regexp='^DefaultLimitNOFILE=65535' line='DefaultLimitNOFILE=65535' state=present + tags: performance + + - name: Increasing number of open files in service + lineinfile: dest=/lib/systemd/system/dkron.service regexp='^KillSignal=SIGTERM\n^LimitNOFILE=65535' line='LimitNOFILE=65535' state=present + tags: performance_b + + ### Network performance tweaks + + - name: Set ARP GC entry point at 0 + sysctl: + name: net.ipv4.neigh.default.gc_thresh1 + value: "0" + tags: performance + + - name: Increase maximum number of sockets + sysctl: + name: net.core.somaxconn + value: "32768" + tags: performance + + - name: Increase maximum number of sockets in the backlog + sysctl: + name: net.ipv4.tcp_max_syn_backlog + value: "131072" + tags: performance + + - name: Setting sane defaults for TCP reading sockets + sysctl: + name: net.ipv4.tcp_rmem + value: "4096 16384 16777216" + tags: performance + + - name: Setting sane defaults for TCP writing sockets + sysctl: + name: net.ipv4.tcp_wmem + value: "4096 16384 16777216" + tags: performance From e89223565464bed2cb0e6921e009633697a2484c Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Thu, 7 May 2020 23:43:52 +0200 Subject: [PATCH 2/3] Increment Raft timeout to consul level --- dkron/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkron/agent.go b/dkron/agent.go index f9072f622..628df780a 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -30,7 +30,7 @@ import ( ) const ( - raftTimeout = 10 * time.Second + raftTimeout = 30 * time.Second // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 From b4e06543edc4f35d423025bcccfffcd6d425f508 Mon Sep 17 00:00:00 2001 From: Victor Castell Date: Thu, 7 May 2020 23:46:44 +0200 Subject: [PATCH 3/3] fix: Next time overwrite --- dkron/store.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dkron/store.go b/dkron/store.go index 66ae8a118..2a75dfa07 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -138,7 +138,10 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { return err } } else { - job.Next = ej.Next + // If comming from a backup us the previous value, don't allow overwriting this + if job.Next.Before(ej.Next) { + job.Next = ej.Next + } } pbj := job.ToProto()