diff --git a/dkron/agent.go b/dkron/agent.go index 6cb4e81d8..628df780a 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -2,7 +2,6 @@ package dkron import ( "crypto/tls" - "encoding/json" "errors" "expvar" "fmt" @@ -23,12 +22,15 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" + "github.com/juliangruber/go-intersect" "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( - raftTimeout = 10 * time.Second + raftTimeout = 30 * time.Second // raftLogCacheSize is the maximum number of logs to cache in-memory. // This is used to reduce disk I/O for the recently committed entries. raftLogCacheSize = 512 @@ -41,6 +43,9 @@ var ( // ErrLeaderNotFound is returned when obtained leader is not found in member list ErrLeaderNotFound = errors.New("no member leader found in member list") + // ErrNoSuitableServer returns an error in case no suitable server to send the request is found. + ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting") + runningExecutions sync.Map ) @@ -101,6 +106,8 @@ type Agent struct { peers map[string][]*ServerParts localPeers map[raft.ServerAddress]*ServerParts peerLock sync.RWMutex + + activeExecutions sync.Map } // ProcessorFactory is a function type that creates a new instance @@ -159,8 +166,31 @@ func (a *Agent) Start() error { // Expose the node name expNode.Set(a.config.NodeName) + //Use the value of "RPCPort" if AdvertiseRPCPort has not been set + if a.config.AdvertiseRPCPort <= 0 { + a.config.AdvertiseRPCPort = a.config.RPCPort + } + if a.config.Server { a.StartServer() + } else { + // Create a listener at the desired port. + addr := a.getRPCAddr() + l, err := net.Listen("tcp", addr) + if err != nil { + log.Fatal(err) + } + + opts := []grpc.ServerOption{} + if a.TLSConfig != nil { + tc := credentials.NewTLS(a.TLSConfig) + opts = append(opts, grpc.Creds(tc)) + } + + grpcServer := grpc.NewServer(opts...) + as := NewAgentServer(a) + proto.RegisterAgentServer(grpcServer, as) + go grpcServer.Serve(l) } if a.GRPCClient == nil { @@ -168,10 +198,8 @@ func (a *Agent) Start() error { } tags := a.serf.LocalMember().Tags - if a.config.Server { - tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers - tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort) - } + tags["rpc_addr"] = a.getRPCAddr() // Address that clients will use to RPC to servers + tags["port"] = strconv.Itoa(a.config.AdvertiseRPCPort) a.serf.SetTags(tags) go a.eventLoop() @@ -447,11 +475,6 @@ func (a *Agent) SetConfig(c *Config) { // StartServer launch a new dkron server process func (a *Agent) StartServer() { - //Use the value of "RPCPort" if AdvertiseRPCPort has not been set - if a.config.AdvertiseRPCPort <= 0 { - a.config.AdvertiseRPCPort = a.config.RPCPort - } - if a.Store == nil { s, err := NewStore() if err != nil { @@ -634,62 +657,6 @@ func (a *Agent) eventLoop() { } } - if e.EventType() == serf.EventQuery { - query := e.(*serf.Query) - - if query.Name == QueryRunJob { - log.WithFields(logrus.Fields{ - "query": query.Name, - "payload": string(query.Payload), - "at": query.LTime, - }).Debug("agent: Running job") - - var rqp RunQueryParam - if err := json.Unmarshal(query.Payload, &rqp); err != nil { - log.WithField("query", QueryRunJob).Fatal("agent: Error unmarshaling query payload") - } - - log.WithFields(logrus.Fields{ - "job": rqp.Execution.JobName, - }).Info("agent: Starting job") - - // There are two error types to handle here: - // Key not found when the job is removed from store - // Dial tcp error - // In case of deleted job or other error, we should report and break the flow. - // On dial error we should retry with a limit. - i := 0 - RetryGetJob: - job, err := a.GRPCClient.GetJob(rqp.RPCAddr, rqp.Execution.JobName) - if err != nil { - if err == ErrRPCDialing { - if i < 10 { - i++ - goto RetryGetJob - } - log.WithError(err).Fatal("agent: A working RPC connection to a Dkron server must exists.") - } - log.WithError(err).Error("agent: Error on rpc.GetJob call") - continue - } - log.WithField("job", job.Name).Debug("agent: GetJob by RPC") - - ex := rqp.Execution - ex.StartedAt = time.Now() - ex.NodeName = a.config.NodeName - - go func() { - if err := a.invokeJob(job, ex); err != nil { - log.WithError(err).Error("agent: Error invoking job") - } - }() - - // Respond with the execution JSON though it is not used in the destination - exJSON, _ := json.Marshal(ex) - query.Respond(exJSON) - } - } - case <-serfShutdownCh: log.Warn("agent: Serf shutdown detected, quitting") return @@ -710,9 +677,9 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) { return } -func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, error) { - var nodes []string - var candidates []string +func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) { + // candidates will contain a set of candidates by tags + // the final set of nodes will be the intesection of all groups tags := make(map[string]string) // Actually copy the map @@ -724,42 +691,73 @@ func (a *Agent) processFilteredNodes(job *Job) ([]string, map[string]string, err // on the same region. tags["region"] = a.config.Region + candidates := [][]string{} for jtk, jtv := range tags { - var tc []string - if tc = strings.Split(jtv, ":"); len(tc) == 2 { - tv := tc[0] + cans := []string{} + tc := strings.Split(jtv, ":") - // Set original tag to clean tag - tags[jtk] = tv + tv := tc[0] - count, err := strconv.Atoi(tc[1]) - if err != nil { - return nil, nil, err - } + // Set original tag to clean tag + tags[jtk] = tv - for _, member := range a.serf.Members() { - if member.Status == serf.StatusAlive { - for mtk, mtv := range member.Tags { - if mtk == jtk && mtv == tv { - candidates = append(candidates, member.Name) - } + for _, member := range a.serf.Members() { + if member.Status == serf.StatusAlive { + for mtk, mtv := range member.Tags { + if mtk == jtk && mtv == tv { + cans = append(cans, member.Name) } } } + } + + // In case there is cardinality in the tag, randomize the order and select the amount of nodes + // or else just add all nodes to the result. + if len(tc) == 2 { + f := []string{} rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(candidates), func(i, j int) { - candidates[i], candidates[j] = candidates[j], candidates[i] + rand.Shuffle(len(cans), func(i, j int) { + cans[i], cans[j] = cans[j], cans[i] }) + count, err := strconv.Atoi(tc[1]) + if err != nil { + return nil, nil, err + } for i := 1; i <= count; i++ { - if len(candidates) == 0 { + if len(cans) == 0 { break } - nodes = append(nodes, candidates[0]) - candidates = candidates[1:] + f = append(f, cans[0]) + cans = cans[1:] } - candidates = nil + cans = f + } + if len(cans) > 0 { + candidates = append(candidates, cans) + } + } + + // The final result will be the intersection of all candidates. + nodes := make(map[string]string) + r := candidates[0] + for i := 1; i <= len(candidates)-1; i++ { + isec := intersect.Simple(r, candidates[i]).([]interface{}) + // Empty the slice + r = []string{} + + // Refill with the intersection + for _, v := range isec { + r = append(r, v.(string)) + } + } + for _, n := range r { + for _, m := range a.serf.Members() { + if n == m.Name { + // If the server is missing the rpc_addr tag, default to the serf advertise addr + nodes[n] = m.Tags["rpc_addr"] + } } } @@ -844,3 +842,22 @@ func (a *Agent) recursiveSetJob(jobs []*Job) []string { } return result } + +// Check if the server is alive and select it +func (a *Agent) checkAndSelectServer() (string, error) { + var peers []string + for _, p := range a.LocalServers() { + peers = append(peers, p.RPCAddr.String()) + } + + for _, peer := range peers { + log.Debugf("Checking peer: %v", peer) + conn, err := net.DialTimeout("tcp", peer, 1*time.Second) + if err == nil { + conn.Close() + log.Debugf("Found good peer: %v", peer) + return peer, nil + } + } + return "", ErrNoSuitableServer +} diff --git a/dkron/agent_test.go b/dkron/agent_test.go index 0d74a4bf7..baf2e06c1 100644 --- a/dkron/agent_test.go +++ b/dkron/agent_test.go @@ -146,6 +146,27 @@ func Test_processFilteredNodes(t *testing.T) { a2 := NewAgent(c) a2.Start() + // Start another agent + ip3, returnFn3 := testutil.TakeIP() + defer returnFn3() + a3Addr := ip3.String() + + c = DefaultConfig() + c.BindAddr = a3Addr + c.StartJoin = []string{a1Addr + ":8946"} + c.NodeName = "test3" + c.Server = false + c.LogLevel = logLevel + c.Tags = map[string]string{ + "tag": "test_client", + "extra": "tag", + } + c.DevMode = true + c.DataDir = dir + + a3 := NewAgent(c) + a3.Start() + time.Sleep(2 * time.Second) job := &Job{ @@ -157,16 +178,53 @@ func Test_processFilteredNodes(t *testing.T) { } nodes, tags, err := a1.processFilteredNodes(job) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assert.Contains(t, nodes, "test1") assert.Contains(t, nodes, "test2") + assert.Len(t, nodes, 2) assert.Equal(t, tags["tag"], "test") + job2 := &Job{ + Name: "test_job_2", + Tags: map[string]string{ + "tag": "test:1", + }, + } + + nodes, _, err = a1.processFilteredNodes(job2) + require.NoError(t, err) + + assert.Len(t, nodes, 1) + + job3 := &Job{ + Name: "test_job_3", + } + + nodes, _, err = a1.processFilteredNodes(job3) + require.NoError(t, err) + + assert.Len(t, nodes, 3) + assert.Contains(t, nodes, "test1") + assert.Contains(t, nodes, "test2") + assert.Contains(t, nodes, "test3") + + job4 := &Job{ + Name: "test_job_4", + Tags: map[string]string{ + "tag": "test_client:1", + }, + } + + nodes, _, err = a1.processFilteredNodes(job4) + require.NoError(t, err) + + assert.Len(t, nodes, 1) + assert.Contains(t, nodes, "test3") + a1.Stop() a2.Stop() + a3.Stop() } func TestEncrypt(t *testing.T) { diff --git a/dkron/grpc.go b/dkron/grpc.go index 8f1da930a..e52be4719 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "io" "net" "sync" "time" @@ -57,6 +56,9 @@ func NewGRPCServer(agent *Agent) DkronGRPCServer { func (grpcs *GRPCServer) Serve(lis net.Listener) error { grpcServer := grpc.NewServer() proto.RegisterDkronServer(grpcServer, grpcs) + + as := NewAgentServer(grpcs.agent) + proto.RegisterAgentServer(grpcServer, as) go grpcServer.Serve(lis) return nil @@ -212,7 +214,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E "execution": execution, }).Debug("grpc: Retrying execution") - if _, err := grpcs.agent.RunQuery(job.Name, execution); err != nil { + if _, err := grpcs.agent.Run(job.Name, execution); err != nil { return nil, err } return &proto.ExecutionDoneResponse{ @@ -260,7 +262,7 @@ func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Emp // RunJob runs a job in the cluster func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error) { ex := NewExecution(req.JobName) - job, err := grpcs.agent.RunQuery(req.JobName, ex) + job, err := grpcs.agent.Run(req.JobName, ex) if err != nil { return nil, err } @@ -362,65 +364,12 @@ REMOVE: return new(empty.Empty), nil } -// AgentRun is called when an agent starts running a job and lasts all execution, -// the agent will stream execution progress to the server. -func (grpcs *GRPCServer) AgentRun(stream proto.Dkron_AgentRunServer) error { - defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) - - var execution *Execution - var first bool - for { - ars, err := stream.Recv() - - // Stream ends - if err == io.EOF { - addr := grpcs.agent.raft.Leader() - if err := grpcs.agent.GRPCClient.ExecutionDone(string(addr), execution); err != nil { - return err - } - return stream.SendAndClose(&proto.AgentRunResponse{ - From: grpcs.agent.Config().NodeName, - }) - } - - // Error receiving from stream - if err != nil { - // At this point the execution status will be unknown, set the FinshedAt time and an explanatory message - execution.FinishedAt = time.Now() - execution.Output = ErrBrokenStream.Error() - - log.WithError(err).Error(ErrBrokenStream) - - addr := grpcs.agent.raft.Leader() - if err := grpcs.agent.GRPCClient.ExecutionDone(string(addr), execution); err != nil { - return err - } - return err - } - - // Registers an active stream - grpcs.activeExecutions.Store(ars.Execution.Key(), ars.Execution) - log.WithField("key", ars.Execution.Key()).Debug("grpc: received execution stream") - - execution = NewExecutionFromProto(ars.Execution) - defer grpcs.activeExecutions.Delete(execution.Key()) - - // Store the received exeuction in the raft log and store - if !first { - if err := grpcs.agent.GRPCClient.SetExecution(ars.Execution); err != nil { - return err - } - first = true - } - } -} - // GetActiveExecutions returns the active executions on the server node func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error) { defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) var executions []*proto.Execution - grpcs.activeExecutions.Range(func(k, v interface{}) bool { + grpcs.agent.activeExecutions.Range(func(k, v interface{}) bool { e := v.(*proto.Execution) executions = append(executions, e) return true diff --git a/dkron/grpc_agent.go b/dkron/grpc_agent.go new file mode 100644 index 000000000..636967880 --- /dev/null +++ b/dkron/grpc_agent.go @@ -0,0 +1,125 @@ +package dkron + +import ( + "errors" + "time" + + "github.com/armon/circbuf" + metrics "github.com/armon/go-metrics" + "github.com/distribworks/dkron/v2/plugin/types" + "github.com/golang/protobuf/ptypes" +) + +const ( + // maxBufSize limits how much data we collect from a handler. + maxBufSize = 256000 +) + +type statusAgentHelper struct { + execution *types.Execution + stream types.Agent_AgentRunServer +} + +func (s *statusAgentHelper) Update(b []byte, c bool) (int64, error) { + s.execution.Output = b + // Send partial execution + if err := s.stream.Send(&types.AgentRunStream{ + Execution: s.execution, + }); err != nil { + return 0, err + } + return 0, nil +} + +// GRPCAgentServer is the local implementation of the gRPC server interface. +type AgentServer struct { + agent *Agent +} + +// NewServer creates and returns an instance of a DkronGRPCServer implementation +func NewAgentServer(agent *Agent) types.AgentServer { + return &AgentServer{ + agent: agent, + } +} + +// AgentRun is called when an agent starts running a job and lasts all execution, +// the agent will stream execution progress to the server. +func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error { + defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now()) + + job := req.Job + execution := req.Execution + + output, _ := circbuf.NewBuffer(maxBufSize) + + var success bool + + jex := job.Executor + exc := job.ExecutorConfig + if jex == "" { + return errors.New("invoke: No executor defined, nothing to do") + } + + // Send the first update with the initial execution state to be stored in the server + execution.StartedAt = ptypes.TimestampNow() + execution.NodeName = as.agent.config.NodeName + + if err := stream.Send(&types.AgentRunStream{ + Execution: execution, + }); err != nil { + return err + } + + // Check if executor exists + if executor, ok := as.agent.ExecutorPlugins[jex]; ok { + log.WithField("plugin", jex).Debug("invoke: calling executor plugin") + runningExecutions.Store(execution.GetGroup(), execution) + out, err := executor.Execute(&types.ExecuteRequest{ + JobName: job.Name, + Config: exc, + }, &statusAgentHelper{ + stream: stream, + execution: execution, + }) + + if err == nil && out.Error != "" { + err = errors.New(out.Error) + } + if err != nil { + log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output") + success = false + output.Write([]byte(err.Error() + "\n")) + } else { + success = true + } + + if out != nil { + output.Write(out.Output) + } + } else { + log.WithField("executor", jex).Error("invoke: Specified executor is not present") + output.Write([]byte("invoke: Specified executor is not present")) + } + + execution.FinishedAt = ptypes.TimestampNow() + execution.Success = success + execution.Output = output.Bytes() + + runningExecutions.Delete(execution.GetGroup()) + + // Send the final execution + if err := stream.Send(&types.AgentRunStream{ + Execution: execution, + }); err != nil { + // In case of error means that maybe the server is gone so fallback to ExecutionDone + log.WithError(err).WithField("job", job.Name).Error("invoke: error sending the final execution, falling back to ExecutionDone") + rpcServer, err := as.agent.checkAndSelectServer() + if err != nil { + return err + } + return as.agent.GRPCClient.ExecutionDone(rpcServer, NewExecutionFromProto(execution)) + } + + return nil +} diff --git a/dkron/grpc_client.go b/dkron/grpc_client.go index 1a6da4423..9bf03995c 100644 --- a/dkron/grpc_client.go +++ b/dkron/grpc_client.go @@ -2,10 +2,12 @@ package dkron import ( "fmt" + "io" "time" metrics "github.com/armon/go-metrics" proto "github.com/distribworks/dkron/v2/plugin/types" + "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -26,6 +28,7 @@ type DkronGRPCClient interface { RaftRemovePeerByID(string, string) error GetActiveExecutions(string) ([]*proto.Execution, error) SetExecution(execution *proto.Execution) error + AgentRun(addr string, job *proto.Job, execution *proto.Execution) error } // GRPCClient is the local implementation of the DkronGRPCClient interface. @@ -378,3 +381,74 @@ func (grpcc *GRPCClient) SetExecution(execution *proto.Execution) error { } return nil } + +// AgentRun runs a job in the given agent +func (grpcc *GRPCClient) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error { + defer metrics.MeasureSince([]string{"grpc_client", "agent_run"}, time.Now()) + var conn *grpc.ClientConn + + // Initiate a connection with the server + conn, err := grpcc.Connect(string(addr)) + if err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "method": "AgentRun", + "server_addr": addr, + }).Error("grpc: error dialing.") + return err + } + defer conn.Close() + + // Streaming call + a := proto.NewAgentClient(conn) + stream, err := a.AgentRun(context.Background(), &proto.AgentRunRequest{ + Job: job, + Execution: execution, + }) + if err != nil { + return err + } + + var first bool + for { + ars, err := stream.Recv() + + // Stream ends + if err == io.EOF { + addr := grpcc.agent.raft.Leader() + if err := grpcc.ExecutionDone(string(addr), NewExecutionFromProto(execution)); err != nil { + return err + } + return nil + } + + // Error receiving from stream + if err != nil { + // At this point the execution status will be unknown, set the FinshedAt time and an explanatory message + execution.FinishedAt = ptypes.TimestampNow() + execution.Output = []byte(ErrBrokenStream.Error()) + + log.WithError(err).Error(ErrBrokenStream) + + addr := grpcc.agent.raft.Leader() + if err := grpcc.ExecutionDone(string(addr), NewExecutionFromProto(execution)); err != nil { + return err + } + return err + } + + // Registers an active stream + grpcc.agent.activeExecutions.Store(ars.Execution.Key(), ars.Execution) + log.WithField("key", ars.Execution.Key()).Debug("grpc: received execution stream") + + execution = ars.Execution + defer grpcc.agent.activeExecutions.Delete(execution.Key()) + + // Store the received execution in the raft log and store + if !first { + if err := grpcc.SetExecution(ars.Execution); err != nil { + return err + } + first = true + } + } +} diff --git a/dkron/invoke.go b/dkron/invoke.go deleted file mode 100644 index 9da4ccbf2..000000000 --- a/dkron/invoke.go +++ /dev/null @@ -1,156 +0,0 @@ -package dkron - -import ( - "context" - "errors" - "net" - "time" - - "github.com/armon/circbuf" - "github.com/distribworks/dkron/v2/plugin/types" - "github.com/sirupsen/logrus" -) - -const ( - // maxBufSize limits how much data we collect from a handler. - maxBufSize = 256000 -) - -// ErrNoSuitableServer returns an error in case no suitable server to send the request is found. -var ErrNoSuitableServer = errors.New("no suitable server found to send the request, aborting") - -type statusHelper struct { - execution *Execution - stream types.Dkron_AgentRunClient -} - -func (s *statusHelper) Update(b []byte, c bool) (int64, error) { - s.execution.Output = string(b) - // Send partial execution - if err := s.stream.Send(&types.AgentRunStream{ - Execution: s.execution.ToProto(), - }); err != nil { - return 0, err - } - return 0, nil -} - -// invokeJob will execute the given job. Depending on the event. -func (a *Agent) invokeJob(job *Job, execution *Execution) error { - output, _ := circbuf.NewBuffer(maxBufSize) - - var success bool - - jex := job.Executor - exc := job.ExecutorConfig - if jex == "" { - return errors.New("invoke: No executor defined, nothing to do") - } - - // Connect to a server to stream the execution - rpcServer, err := a.checkAndSelectServer() - if err != nil { - return err - } - log.WithField("server", rpcServer).Debug("invoke: Selected a server to send result") - - conn, err := a.GRPCClient.Connect(rpcServer) - if err != nil { - log.WithError(err).WithFields(logrus.Fields{ - "method": "Invoke", - "server_addr": rpcServer, - }).Error("grpc: error dialing.") - return err - } - defer conn.Close() - client := types.NewDkronClient(conn) - - stream, err := client.AgentRun(context.Background()) - if err != nil { - return err - } - - // Send the first update with the initial execution state to be stored in the server - if err := stream.Send(&types.AgentRunStream{ - Execution: execution.ToProto(), - }); err != nil { - return err - } - - // Check if executor exists - if executor, ok := a.ExecutorPlugins[jex]; ok { - log.WithField("plugin", jex).Debug("invoke: calling executor plugin") - runningExecutions.Store(execution.GetGroup(), execution) - out, err := executor.Execute(&types.ExecuteRequest{ - JobName: job.Name, - Config: exc, - }, &statusHelper{ - stream: stream, - execution: execution, - }) - - if err == nil && out.Error != "" { - err = errors.New(out.Error) - } - if err != nil { - log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output") - success = false - output.Write([]byte(err.Error() + "\n")) - } else { - success = true - } - - if out != nil { - output.Write(out.Output) - } - } else { - log.WithField("executor", jex).Error("invoke: Specified executor is not present") - } - - execution.FinishedAt = time.Now() - execution.Success = success - execution.Output = output.String() - - runningExecutions.Delete(execution.GetGroup()) - - // Send the final execution - if err := stream.Send(&types.AgentRunStream{ - Execution: execution.ToProto(), - }); err != nil { - // In case of error means that maybe the server is gone so fallback to ExecutionDone - log.WithError(err).WithField("job", job.Name).Error("invoke: error sending the final execution, falling back to ExecutionDone") - return a.GRPCClient.ExecutionDone(rpcServer, execution) - } - - // Close the stream - reply, err := stream.CloseAndRecv() - if err != nil { - // In case of error means that maybe the server is gone so fallback to ExecutionDone - log.WithError(err).WithField("job", job.Name).Error("invoke: error closing the stream, falling back to ExecutionDone") - return a.GRPCClient.ExecutionDone(rpcServer, execution) - } - - // TODO add better logging - log.WithField("from", reply.From).Debug("agent: AgentRun reply") - - return nil -} - -// Check if the server is alive and select it -func (a *Agent) checkAndSelectServer() (string, error) { - var peers []string - for _, p := range a.LocalServers() { - peers = append(peers, p.RPCAddr.String()) - } - - for _, peer := range peers { - log.Debugf("Checking peer: %v", peer) - conn, err := net.DialTimeout("tcp", peer, 1*time.Second) - if err == nil { - conn.Close() - log.Debugf("Found good peer: %v", peer) - return peer, nil - } - } - return "", ErrNoSuitableServer -} diff --git a/dkron/invoke_test.go b/dkron/invoke_test.go deleted file mode 100644 index 115a0b6dd..000000000 --- a/dkron/invoke_test.go +++ /dev/null @@ -1 +0,0 @@ -package dkron diff --git a/dkron/job.go b/dkron/job.go index 9ce79e54e..d70159ff3 100644 --- a/dkron/job.go +++ b/dkron/job.go @@ -96,9 +96,6 @@ type Job struct { // Number of times to retry a job that failed an execution. Retries uint `json:"retries"` - // running indicates that the Run method is still broadcasting - running bool - // Jobs that are dependent upon this one will be run after this job runs. DependentJobs []string `json:"dependent_jobs"` @@ -229,9 +226,6 @@ func (j *Job) Run() { // Check if it's runnable if j.isRunnable() { - j.running = true - defer func() { j.running = false }() - log.WithFields(logrus.Fields{ "job": j.Name, "schedule": j.Schedule, @@ -241,7 +235,8 @@ func (j *Job) Run() { // Simple execution wrapper ex := NewExecution(j.Name) - if _, err := j.Agent.RunQuery(j.Name, ex); err != nil { + + if _, err := j.Agent.Run(j.Name, ex); err != nil { log.WithError(err).Error("job: Error sending Run query to serf cluster") } } @@ -317,12 +312,6 @@ func (j *Job) isRunnable() bool { } } - if j.running { - log.WithField("job", j.Name). - Warning("job: Skipping execution because last execution still broadcasting, consider increasing schedule interval") - return false - } - return true } diff --git a/dkron/job_test.go b/dkron/job_test.go index fb48bec54..14900be4e 100644 --- a/dkron/job_test.go +++ b/dkron/job_test.go @@ -142,15 +142,6 @@ func Test_isRunnable(t *testing.T) { }, want: false, }, - { - name: "running true", - job: &Job{ - Name: "test_job", - Agent: a, - running: true, - }, - want: false, - }, { name: "should run", job: &Job{ @@ -190,6 +181,9 @@ func (gRPCClientMock) GetActiveExecutions(s string) ([]*proto.Execution, error) }, nil } func (gRPCClientMock) SetExecution(execution *proto.Execution) error { return nil } +func (gRPCClientMock) AgentRun(addr string, job *proto.Job, execution *proto.Execution) error { + return nil +} func Test_generateJobTree(t *testing.T) { jsonString := `[ diff --git a/dkron/notifier.go b/dkron/notifier.go index a6ddc8020..b0e743d38 100644 --- a/dkron/notifier.go +++ b/dkron/notifier.go @@ -88,7 +88,7 @@ func (n *Notifier) buildTemplate(templ string) *bytes.Buffer { n.Execution.FinishedAt, fmt.Sprintf("%t", n.Execution.Success), n.Execution.NodeName, - fmt.Sprintf("%s", n.Execution.Output), + n.Execution.Output, } out := &bytes.Buffer{} diff --git a/dkron/queries.go b/dkron/queries.go deleted file mode 100644 index 02b7b874d..000000000 --- a/dkron/queries.go +++ /dev/null @@ -1,140 +0,0 @@ -package dkron - -import ( - "bytes" - "encoding/json" - "fmt" - "time" - - "github.com/hashicorp/serf/serf" - "github.com/sirupsen/logrus" -) - -const ( - // QueryRunJob define a run job query string - QueryRunJob = "run:job" - // QueryExecutionDone define the execution done query string - QueryExecutionDone = "execution:done" -) - -// RunQueryParam defines the struct used to send a Run query -// using serf. -type RunQueryParam struct { - Execution *Execution `json:"execution"` - RPCAddr string `json:"rpc_addr"` -} - -// RunQuery sends a serf run query to the cluster, this is used to ask a node or nodes -// to run a Job. Returns a job with it's new status and next schedule. -func (a *Agent) RunQuery(jobName string, ex *Execution) (*Job, error) { - start := time.Now() - var params *serf.QueryParam - - job, err := a.Store.GetJob(jobName, nil) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery error retrieving job: %s from store: %w", jobName, err) - } - - // In case the job is not a child job, compute the next execution time - if job.ParentJob == "" { - if e, ok := a.sched.GetEntry(jobName); ok { - job.Next = e.Next - if err := a.applySetJob(job.ToProto()); err != nil { - return nil, fmt.Errorf("agent: RunQuery error storing job %s before running: %w", jobName, err) - } - } else { - return nil, fmt.Errorf("agent: RunQuery error retrieving job: %s from scheduler", jobName) - } - } - - // In the first execution attempt we build and filter the target nodes - // but we use the existing node target in case of retry. - if ex.Attempt <= 1 { - filterNodes, filterTags, err := a.processFilteredNodes(job) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery error processing filtered nodes: %w", err) - } - log.WithFields(logrus.Fields{ - "job": job.Name, - }).Debug("agent: Filtered nodes to run: ", filterNodes) - log.WithFields(logrus.Fields{ - "job": job.Name, - }).Debug("agent: Filtered tags to run: ", filterTags) - - //serf match regexp but we want only match full tag - serfFilterTags := make(map[string]string) - for key, val := range filterTags { - b := new(bytes.Buffer) - b.WriteString("^") - b.WriteString(val) - b.WriteString("$") - serfFilterTags[key] = b.String() - } - - params = &serf.QueryParam{ - FilterNodes: filterNodes, - FilterTags: serfFilterTags, - RequestAck: true, - } - } else { - params = &serf.QueryParam{ - FilterNodes: []string{ex.NodeName}, - RequestAck: true, - } - } - - rqp := &RunQueryParam{ - Execution: ex, - RPCAddr: a.getRPCAddr(), - } - rqpJSON, _ := json.Marshal(rqp) - - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job": job.Name, - }).Info("agent: Sending query") - - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job": job.Name, - "json": string(rqpJSON), - }).Debug("agent: Sending query") - - qr, err := a.serf.Query(QueryRunJob, rqpJSON, params) - if err != nil { - return nil, fmt.Errorf("agent: RunQuery sending query error: %w", err) - } - defer qr.Close() - - ackCh := qr.AckCh() - respCh := qr.ResponseCh() - - for !qr.Finished() { - select { - case ack, ok := <-ackCh: - if ok { - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job": job.Name, - "from": ack, - }).Debug("agent: Received ack") - } - case resp, ok := <-respCh: - if ok { - log.WithFields(logrus.Fields{ - "query": QueryRunJob, - "job": job.Name, - "from": resp.From, - "response": string(resp.Payload), - }).Debug("agent: Received response") - } - } - } - log.WithFields(logrus.Fields{ - "time": time.Since(start), - "job": job.Name, - "query": QueryRunJob, - }).Debug("agent: Done receiving acks and responses") - - return job, nil -} diff --git a/dkron/queries_test.go b/dkron/queries_test.go deleted file mode 100644 index 96bcd6fbd..000000000 --- a/dkron/queries_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package dkron - -import ( - "errors" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/hashicorp/serf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tidwall/buntdb" -) - -func TestRunQuery(t *testing.T) { - dir, err := ioutil.TempDir("", "dkron-test") - require.NoError(t, err) - defer os.RemoveAll(dir) - - ip1, returnFn1 := testutil.TakeIP() - defer returnFn1() - advAddr := ip1.String() - - c := DefaultConfig() - c.BindAddr = advAddr + ":5000" - c.NodeName = "test1" - c.Server = true - c.Tags = map[string]string{"role": "test"} - c.LogLevel = logLevel - c.DevMode = true - c.DataDir = dir - c.BootstrapExpect = 1 - - a := NewAgent(c) - err = a.Start() - require.NoError(t, err) - time.Sleep(2 * time.Second) - - // Test error with no job - _, err = a.RunQuery("foo", &Execution{}) - assert.True(t, errors.Is(err, buntdb.ErrNotFound)) - - j1 := &Job{ - Name: "test_job", - Schedule: "@daily", - } - err = a.Store.SetJob(j1, false) - require.NoError(t, err) - - a.sched.Start([]*Job{j1}, a) - - _, err = a.RunQuery("test_job", &Execution{}) - assert.NoError(t, err) - - a.Stop() -} diff --git a/dkron/run.go b/dkron/run.go new file mode 100644 index 000000000..ddd22752e --- /dev/null +++ b/dkron/run.go @@ -0,0 +1,66 @@ +package dkron + +import ( + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +// Run call the agents to run a job. Returns a job with it's new status and next schedule. +func (a *Agent) Run(jobName string, ex *Execution) (*Job, error) { + job, err := a.Store.GetJob(jobName, nil) + if err != nil { + return nil, fmt.Errorf("agent: Run error retrieving job: %s from store: %w", jobName, err) + } + + // In case the job is not a child job, compute the next execution time + if job.ParentJob == "" { + if e, ok := a.sched.GetEntry(jobName); ok { + job.Next = e.Next + if err := a.applySetJob(job.ToProto()); err != nil { + return nil, fmt.Errorf("agent: Run error storing job %s before running: %w", jobName, err) + } + } else { + return nil, fmt.Errorf("agent: Run error retrieving job: %s from scheduler", jobName) + } + } + + // In the first execution attempt we build and filter the target nodes + // but we use the existing node target in case of retry. + var filterMap map[string]string + if ex.Attempt <= 1 { + filterMap, _, err = a.processFilteredNodes(job) + if err != nil { + return nil, fmt.Errorf("agent: Run error processing filtered nodes: %w", err) + } + } else { + filterMap = map[string]string{ex.NodeName: ""} + } + + log.WithField("nodes", filterMap).Debug("agent: Filtered nodes to run") + + var wg sync.WaitGroup + for _, v := range filterMap { + // Call here client GRPC AgentRun + wg.Add(1) + go func(node string, wg *sync.WaitGroup) { + defer wg.Done() + log.WithFields(logrus.Fields{ + "job_name": job.Name, + "node": node, + }).Info("agent: Calling AgentRun") + + err := a.GRPCClient.AgentRun(node, job.ToProto(), ex.ToProto()) + if err != nil { + log.WithFields(logrus.Fields{ + "job_name": job.Name, + "node": node, + }).Error("agent: Error calling AgentRun") + } + }(v, &wg) + } + + wg.Wait() + return job, nil +} diff --git a/dkron/store.go b/dkron/store.go index 66ae8a118..2a75dfa07 100644 --- a/dkron/store.go +++ b/dkron/store.go @@ -138,7 +138,10 @@ func (s *Store) SetJob(job *Job, copyDependentJobs bool) error { return err } } else { - job.Next = ej.Next + // If comming from a backup us the previous value, don't allow overwriting this + if job.Next.Before(ej.Next) { + job.Next = ej.Next + } } pbj := job.ToProto() diff --git a/go.mod b/go.mod index 3b175a8a4..ad7a8bc94 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/hashicorp/serf v0.9.0 github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect github.com/jordan-wright/email v0.0.0-20180115032944-94ae17dedda2 + github.com/juliangruber/go-intersect v1.0.0 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 github.com/mattn/go-shellwords v1.0.10 github.com/mitchellh/go-testing-interface v1.0.0 // indirect diff --git a/go.sum b/go.sum index 6001bd32e..8116c308d 100644 --- a/go.sum +++ b/go.sum @@ -232,6 +232,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juliangruber/go-intersect v1.0.0 h1:0XNPNaEoPd7PZljVNZLk4qrRkR153Sjk2ZL1426zFQ0= +github.com/juliangruber/go-intersect v1.0.0/go.mod h1:unIef4vysSJvZ6adJAAPiBVKpS4r/IOkmfuFghRFDDM= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= diff --git a/plugin/types/dkron.pb.go b/plugin/types/dkron.pb.go index 0fc56c57b..1927c987f 100644 --- a/plugin/types/dkron.pb.go +++ b/plugin/types/dkron.pb.go @@ -1180,6 +1180,53 @@ func (m *GetActiveExecutionsResponse) GetExecutions() []*Execution { return nil } +type AgentRunRequest struct { + Job *Job `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` + Execution *Execution `protobuf:"bytes,2,opt,name=execution,proto3" json:"execution,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AgentRunRequest) Reset() { *m = AgentRunRequest{} } +func (m *AgentRunRequest) String() string { return proto.CompactTextString(m) } +func (*AgentRunRequest) ProtoMessage() {} +func (*AgentRunRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1f0292872e9433f8, []int{21} +} + +func (m *AgentRunRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AgentRunRequest.Unmarshal(m, b) +} +func (m *AgentRunRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AgentRunRequest.Marshal(b, m, deterministic) +} +func (m *AgentRunRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AgentRunRequest.Merge(m, src) +} +func (m *AgentRunRequest) XXX_Size() int { + return xxx_messageInfo_AgentRunRequest.Size(m) +} +func (m *AgentRunRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AgentRunRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AgentRunRequest proto.InternalMessageInfo + +func (m *AgentRunRequest) GetJob() *Job { + if m != nil { + return m.Job + } + return nil +} + +func (m *AgentRunRequest) GetExecution() *Execution { + if m != nil { + return m.Execution + } + return nil +} + func init() { proto.RegisterType((*Job)(nil), "types.Job") proto.RegisterMapType((map[string]string)(nil), "types.Job.ExecutorConfigEntry") @@ -1208,6 +1255,7 @@ func init() { proto.RegisterType((*AgentRunStream)(nil), "types.AgentRunStream") proto.RegisterType((*AgentRunResponse)(nil), "types.AgentRunResponse") proto.RegisterType((*GetActiveExecutionsResponse)(nil), "types.GetActiveExecutionsResponse") + proto.RegisterType((*AgentRunRequest)(nil), "types.AgentRunRequest") } func init() { @@ -1215,89 +1263,91 @@ func init() { } var fileDescriptor_1f0292872e9433f8 = []byte{ - // 1307 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x72, 0xdb, 0x36, - 0x10, 0x1d, 0x5a, 0x96, 0x2c, 0xae, 0x2e, 0x71, 0x60, 0x27, 0x61, 0xe8, 0xb4, 0xd1, 0x28, 0xd3, - 0x19, 0xb5, 0x99, 0x28, 0xad, 0xdb, 0x34, 0xb7, 0x4e, 0x26, 0x6e, 0xec, 0x7a, 0xea, 0x69, 0x53, - 0x97, 0xf2, 0xf4, 0xa5, 0x0f, 0x1a, 0x48, 0x5c, 0xcb, 0x4c, 0x28, 0x42, 0x05, 0x40, 0x37, 0xea, - 0x63, 0xff, 0xa3, 0x0f, 0xfd, 0x88, 0xfe, 0x5f, 0x07, 0x00, 0x41, 0x53, 0xb2, 0xe4, 0x4b, 0xde, - 0xb8, 0x8b, 0x83, 0xc5, 0xee, 0xe2, 0x9c, 0x25, 0xa0, 0x16, 0xbe, 0xe7, 0x2c, 0xe9, 0x4e, 0x38, - 0x93, 0x8c, 0x94, 0xe5, 0x74, 0x82, 0xc2, 0xbf, 0x3f, 0x62, 0x6c, 0x14, 0xe3, 0x63, 0xed, 0x1c, - 0xa4, 0xc7, 0x8f, 0x65, 0x34, 0x46, 0x21, 0xe9, 0x78, 0x62, 0x70, 0xfe, 0xd6, 0x3c, 0x00, 0xc7, - 0x13, 0x39, 0x35, 0x8b, 0xed, 0xff, 0x5c, 0x28, 0x1d, 0xb0, 0x01, 0x21, 0xb0, 0x9a, 0xd0, 0x31, - 0x7a, 0x4e, 0xcb, 0xe9, 0xb8, 0x81, 0xfe, 0x26, 0x3e, 0x54, 0x55, 0xac, 0xbf, 0x58, 0x82, 0xde, - 0x8a, 0xf6, 0xe7, 0xb6, 0x5a, 0x13, 0xc3, 0x13, 0x0c, 0xd3, 0x18, 0xbd, 0x92, 0x59, 0xb3, 0x36, - 0xd9, 0x84, 0x32, 0xfb, 0x33, 0x41, 0xee, 0xad, 0xe9, 0x05, 0x63, 0x90, 0xfb, 0x50, 0xd3, 0x1f, - 0x7d, 0x1c, 0xd3, 0x28, 0xf6, 0xaa, 0x7a, 0x0d, 0xb4, 0x6b, 0x4f, 0x79, 0xc8, 0x03, 0x68, 0x88, - 0x74, 0x38, 0x44, 0x21, 0xfa, 0x43, 0x96, 0x26, 0xd2, 0x73, 0x5b, 0x4e, 0xa7, 0x1c, 0xd4, 0x33, - 0xe7, 0x1b, 0xe5, 0x53, 0x51, 0x90, 0x73, 0xc6, 0x33, 0x08, 0x68, 0x08, 0x68, 0x97, 0x01, 0xf8, - 0x50, 0x0d, 0x23, 0x41, 0x07, 0x31, 0x86, 0x5e, 0xad, 0xe5, 0x74, 0xaa, 0x41, 0x6e, 0x93, 0x0e, - 0xac, 0x4a, 0x3a, 0x12, 0x5e, 0xbd, 0x55, 0xea, 0xd4, 0xb6, 0x37, 0xbb, 0xba, 0x81, 0xdd, 0x03, - 0x36, 0xe8, 0x1e, 0xd1, 0x91, 0xd8, 0x4b, 0x24, 0x9f, 0x06, 0x1a, 0x41, 0x3c, 0x58, 0xe3, 0x28, - 0x79, 0x84, 0xc2, 0x6b, 0xb4, 0x9c, 0x4e, 0x23, 0xb0, 0x26, 0xf9, 0x0c, 0x9a, 0x21, 0x4e, 0x30, - 0x09, 0x31, 0x91, 0xfd, 0x77, 0x6c, 0x20, 0xbc, 0x66, 0xab, 0xd4, 0x71, 0x83, 0x46, 0xee, 0x3d, - 0x60, 0x03, 0x41, 0x3e, 0x01, 0x98, 0x50, 0x9e, 0x61, 0xbc, 0x1b, 0xba, 0x58, 0xd7, 0x78, 0x54, - 0xbb, 0x5b, 0x50, 0x1b, 0xb2, 0x64, 0x98, 0x72, 0x8e, 0xc9, 0x70, 0xea, 0xad, 0xeb, 0xf5, 0xa2, - 0x4b, 0xd5, 0x81, 0x1f, 0x70, 0x98, 0x4a, 0xc6, 0xbd, 0x9b, 0xa6, 0xc1, 0xd6, 0x26, 0xfb, 0x70, - 0xc3, 0x7e, 0xf7, 0x87, 0x2c, 0x39, 0x8e, 0x46, 0x1e, 0xd1, 0x25, 0x7d, 0x5a, 0x28, 0x69, 0x2f, - 0x43, 0xbc, 0xd1, 0x00, 0x53, 0x5c, 0x13, 0x67, 0x9c, 0xe4, 0x36, 0x54, 0x84, 0xa4, 0x32, 0x15, - 0xde, 0x86, 0x3e, 0x22, 0xb3, 0xc8, 0x37, 0x50, 0x1d, 0xa3, 0xa4, 0x21, 0x95, 0xd4, 0xdb, 0xd4, - 0x91, 0xbd, 0x42, 0xe4, 0x9f, 0xb3, 0x25, 0x13, 0x33, 0x47, 0x92, 0x17, 0x50, 0x8f, 0xa9, 0x90, - 0xfd, 0xec, 0xc2, 0xbc, 0xbb, 0x2d, 0xa7, 0x53, 0xdb, 0xbe, 0x53, 0xd8, 0xf9, 0x36, 0x8d, 0x63, - 0x75, 0x15, 0x47, 0xd1, 0x18, 0x83, 0x9a, 0x02, 0xf7, 0x0c, 0x96, 0x7c, 0x0b, 0xa0, 0xf7, 0xea, - 0x9b, 0xf4, 0xfc, 0x8b, 0x77, 0xba, 0x0a, 0xba, 0xa7, 0x90, 0xa4, 0x0b, 0xab, 0x09, 0x7e, 0x90, - 0xde, 0x1d, 0xbd, 0xc3, 0xef, 0x1a, 0xae, 0x77, 0x2d, 0xd7, 0xbb, 0x47, 0x56, 0x0c, 0x81, 0xc6, - 0xa9, 0xc6, 0x87, 0x91, 0x98, 0xc4, 0x74, 0xaa, 0xe9, 0xee, 0x99, 0xc6, 0x17, 0x5c, 0xe4, 0x05, - 0xc0, 0x84, 0x33, 0x95, 0x14, 0xe3, 0xc2, 0xdb, 0xd2, 0xd5, 0xfb, 0x85, 0x4c, 0x0e, 0xf3, 0x45, - 0x53, 0x7f, 0x01, 0xed, 0x3f, 0x05, 0x37, 0x67, 0x12, 0x59, 0x87, 0xd2, 0x7b, 0x9c, 0x66, 0x8a, - 0x52, 0x9f, 0x4a, 0x18, 0xa7, 0x34, 0x4e, 0xad, 0x9a, 0x8c, 0xf1, 0x62, 0xe5, 0x99, 0xe3, 0xef, - 0xc0, 0xc6, 0x82, 0xfb, 0xba, 0x56, 0x88, 0x97, 0xd0, 0x98, 0xb9, 0x98, 0x6b, 0x6d, 0xfe, 0x1d, - 0xea, 0xc5, 0x0e, 0x93, 0x2d, 0x70, 0x4f, 0xa8, 0xe8, 0x1b, 0xb4, 0x63, 0x64, 0x74, 0x42, 0xc5, - 0x6f, 0xca, 0x56, 0x3d, 0x57, 0x73, 0x40, 0x47, 0xb9, 0xa4, 0xe7, 0x0a, 0xe7, 0x07, 0x70, 0x63, - 0xae, 0x69, 0x0b, 0x72, 0xfb, 0xbc, 0x98, 0x5b, 0x6d, 0x7b, 0x23, 0xeb, 0xf8, 0x61, 0x9c, 0x8e, - 0xa2, 0xc4, 0xf4, 0xa4, 0x90, 0x70, 0xfb, 0x6f, 0x07, 0xea, 0xc5, 0x35, 0xf2, 0x14, 0x2a, 0x99, - 0x14, 0x1c, 0x7d, 0x65, 0xf7, 0x17, 0x04, 0xe8, 0x16, 0xb5, 0x90, 0xc1, 0xfd, 0xe7, 0x50, 0xfb, - 0xc8, 0x96, 0xb7, 0x1f, 0x41, 0xa3, 0x87, 0x4a, 0xcf, 0x01, 0xfe, 0x91, 0xa2, 0x90, 0xe4, 0x1e, - 0x94, 0x94, 0xdc, 0x1d, 0x5d, 0x02, 0x9c, 0x91, 0x26, 0x50, 0xee, 0x76, 0x17, 0x9a, 0x16, 0x2e, - 0x26, 0x2c, 0x11, 0x78, 0x09, 0xfe, 0x11, 0xac, 0xef, 0x62, 0x8c, 0x12, 0x0b, 0x27, 0xdc, 0x85, - 0xea, 0x3b, 0x36, 0xe8, 0x17, 0x66, 0xf5, 0xda, 0x3b, 0x36, 0x78, 0x4b, 0xc7, 0xd8, 0xfe, 0x0a, - 0x6e, 0x16, 0xe0, 0x57, 0x3a, 0xe1, 0x0b, 0x68, 0xec, 0xcf, 0x14, 0x70, 0x41, 0xf8, 0x2e, 0x34, - 0xf7, 0xaf, 0x93, 0xfd, 0x3f, 0x2b, 0xe0, 0x1a, 0x4e, 0x47, 0x2c, 0xb9, 0x20, 0xb0, 0x9a, 0xb5, - 0x76, 0x62, 0xac, 0x68, 0xa6, 0x59, 0x53, 0x8d, 0x27, 0x96, 0xca, 0x49, 0x2a, 0xf5, 0x2f, 0xa6, - 0x1e, 0x64, 0x96, 0x62, 0x67, 0xc2, 0x42, 0x34, 0xd1, 0x56, 0xcd, 0x70, 0x54, 0x0e, 0x1d, 0x6e, - 0x13, 0xca, 0x23, 0xce, 0xd2, 0x89, 0x57, 0x6e, 0x39, 0x9d, 0x52, 0x60, 0x0c, 0x75, 0x08, 0x95, - 0x52, 0xfd, 0xf9, 0xbc, 0x8a, 0x19, 0xe8, 0x99, 0x49, 0x9e, 0x03, 0x08, 0x49, 0xb9, 0xc4, 0xb0, - 0x4f, 0xa5, 0xfe, 0x65, 0x5d, 0xcc, 0x69, 0x37, 0x43, 0xef, 0x48, 0xf2, 0x12, 0x6a, 0xc7, 0x51, - 0x12, 0x89, 0x13, 0xb3, 0xb7, 0x7a, 0xe9, 0x5e, 0xb0, 0xf0, 0x1d, 0xd9, 0xfe, 0x01, 0x36, 0xf3, - 0xf6, 0xec, 0xb2, 0x04, 0xed, 0x15, 0x74, 0xc1, 0x45, 0xeb, 0xcf, 0x7a, 0xbb, 0x9e, 0xf5, 0x36, - 0xc7, 0x07, 0x67, 0x90, 0xf6, 0x1e, 0xdc, 0x9a, 0x8b, 0x93, 0x5d, 0x0f, 0x81, 0xd5, 0x63, 0xce, - 0xc6, 0xf6, 0x97, 0xae, 0xbe, 0x55, 0x1b, 0x26, 0x74, 0x1a, 0x33, 0x1a, 0xea, 0x5e, 0xd7, 0x03, - 0x6b, 0x2a, 0x2a, 0x04, 0x69, 0x72, 0x65, 0x2a, 0x58, 0xec, 0x55, 0x89, 0x7c, 0xc4, 0x46, 0xa3, - 0xf8, 0xea, 0x44, 0x2e, 0xc0, 0xaf, 0x46, 0x36, 0x07, 0x20, 0xa0, 0xc7, 0xb2, 0x87, 0xfc, 0x14, - 0x39, 0x69, 0xc2, 0x4a, 0x14, 0x66, 0x61, 0x57, 0xa2, 0x50, 0xbf, 0x6e, 0x58, 0x68, 0x15, 0xac, - 0xbf, 0x35, 0x23, 0xc2, 0x90, 0x2b, 0xda, 0x99, 0x07, 0x8c, 0x35, 0x15, 0xed, 0x62, 0xa4, 0x21, - 0x72, 0xcd, 0xad, 0x6a, 0x90, 0x59, 0x7a, 0x10, 0x30, 0x89, 0x5c, 0x33, 0xab, 0x1a, 0x18, 0x43, - 0x3d, 0x5b, 0x38, 0x3d, 0x96, 0x7d, 0x7d, 0xdd, 0x43, 0x16, 0x6b, 0x7e, 0xb9, 0x41, 0x5d, 0x39, - 0x0f, 0x33, 0x5f, 0x9b, 0xc2, 0x3d, 0x95, 0xde, 0x3e, 0x4a, 0x33, 0x6b, 0x52, 0x4e, 0xf5, 0x3d, - 0xda, 0xea, 0x1e, 0xc2, 0x9a, 0xd0, 0xa9, 0x8b, 0x6c, 0x7c, 0xdd, 0xcc, 0x2a, 0x3c, 0x2b, 0x2a, - 0xb0, 0x08, 0x95, 0x47, 0x94, 0x84, 0xf8, 0x41, 0x97, 0xb3, 0x1a, 0x18, 0xa3, 0xfd, 0x10, 0xee, - 0x2a, 0x70, 0x80, 0x63, 0x76, 0x8a, 0x87, 0x88, 0xfc, 0xfb, 0xe9, 0x8f, 0xbb, 0xb6, 0xdb, 0x73, - 0x0d, 0x69, 0xbf, 0x86, 0xe6, 0xce, 0x08, 0x13, 0x19, 0xa4, 0x49, 0x4f, 0x72, 0xa4, 0xe3, 0x6b, - 0xd3, 0xee, 0x35, 0xac, 0xdb, 0x08, 0x1f, 0xc9, 0xb8, 0x5f, 0x60, 0x6b, 0x1f, 0xe5, 0xce, 0x50, - 0x46, 0xa7, 0x98, 0x1f, 0x21, 0xf2, 0x60, 0x5f, 0x02, 0xe4, 0xa7, 0xd9, 0xae, 0x9c, 0xcf, 0xa8, - 0x80, 0xd9, 0xfe, 0xb7, 0x02, 0xe5, 0x5d, 0xf5, 0x40, 0x26, 0x4f, 0xa0, 0x62, 0x66, 0x15, 0xb1, - 0x8f, 0xbc, 0x99, 0x31, 0xe7, 0xdf, 0x9a, 0xf3, 0x66, 0x47, 0x1e, 0x40, 0x63, 0x46, 0x4a, 0x64, - 0x6b, 0xfe, 0xbc, 0x82, 0x50, 0xfd, 0x7b, 0x8b, 0x17, 0xb3, 0x58, 0x4f, 0xa1, 0xfc, 0x13, 0xd2, - 0x53, 0x24, 0xb7, 0xcf, 0xcd, 0x83, 0x3d, 0xf5, 0xfe, 0xf6, 0x97, 0xf8, 0x55, 0xee, 0xbd, 0xd9, - 0xdc, 0x7b, 0x0b, 0x73, 0x9f, 0xfb, 0x95, 0xbc, 0x02, 0x37, 0x9f, 0xfe, 0xc4, 0xbe, 0x9c, 0xe6, - 0x7f, 0x1f, 0xbe, 0x77, 0x7e, 0x21, 0xdb, 0xff, 0x04, 0x2a, 0x46, 0xd3, 0xf9, 0xb1, 0x33, 0xe3, - 0x20, 0x3f, 0x76, 0x4e, 0xf8, 0xaf, 0xc0, 0xcd, 0xb5, 0x9a, 0x1f, 0x3b, 0x2f, 0xf6, 0xfc, 0xd8, - 0xf3, 0xb2, 0xee, 0xc1, 0xe6, 0x22, 0x61, 0x2c, 0xed, 0xda, 0x83, 0x82, 0x2e, 0x96, 0xaa, 0xe9, - 0x2d, 0x90, 0xf3, 0x52, 0x20, 0xad, 0xc2, 0xd6, 0x85, 0x2a, 0x59, 0x7a, 0x25, 0xdf, 0x41, 0xd5, - 0x72, 0x9d, 0xd8, 0x3e, 0xcc, 0xca, 0xc7, 0xbf, 0x33, 0xe7, 0xb6, 0xb9, 0x74, 0x1c, 0xf2, 0x2b, - 0x6c, 0x2c, 0xe0, 0xf9, 0xd2, 0x0a, 0xdb, 0x67, 0xdc, 0x5c, 0xaa, 0x8d, 0x67, 0x50, 0xef, 0xa1, - 0x3c, 0xfb, 0xbb, 0x9e, 0xd3, 0xc5, 0xb2, 0x52, 0x06, 0x15, 0x6d, 0x7f, 0xfd, 0x7f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x8a, 0xab, 0x22, 0xc1, 0x4a, 0x0e, 0x00, 0x00, + // 1330 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdb, 0x6e, 0x1b, 0x37, + 0x13, 0xc6, 0x4a, 0x96, 0xad, 0x1d, 0x1d, 0xec, 0xd0, 0x4e, 0xb2, 0x59, 0xe7, 0xff, 0x23, 0x28, + 0xf8, 0x01, 0xfd, 0x0d, 0xa2, 0xa4, 0x6e, 0xd3, 0x9c, 0x80, 0x20, 0x6e, 0xec, 0x1a, 0x35, 0xda, + 0xd4, 0x5d, 0x19, 0xbd, 0xe9, 0x85, 0x40, 0x69, 0xc7, 0xf2, 0x26, 0xab, 0xa5, 0x4a, 0x72, 0xdd, + 0xa8, 0x97, 0x7d, 0x8f, 0x3e, 0x46, 0x9f, 0xa8, 0x2f, 0x52, 0x90, 0x5c, 0xae, 0x57, 0xb2, 0xe4, + 0x43, 0xee, 0x38, 0xc3, 0x8f, 0xc3, 0x39, 0x7c, 0x33, 0x24, 0xd4, 0xc2, 0x8f, 0x9c, 0x25, 0xdd, + 0x09, 0x67, 0x92, 0x91, 0x8a, 0x9c, 0x4e, 0x50, 0xf8, 0x0f, 0x46, 0x8c, 0x8d, 0x62, 0x7c, 0xa2, + 0x95, 0x83, 0xf4, 0xe4, 0x89, 0x8c, 0xc6, 0x28, 0x24, 0x1d, 0x4f, 0x0c, 0xce, 0xdf, 0x9e, 0x07, + 0xe0, 0x78, 0x22, 0xa7, 0x66, 0xb3, 0xfd, 0xb7, 0x0b, 0xe5, 0x43, 0x36, 0x20, 0x04, 0x56, 0x12, + 0x3a, 0x46, 0xcf, 0x69, 0x39, 0x1d, 0x37, 0xd0, 0x6b, 0xe2, 0x43, 0x55, 0xd9, 0xfa, 0x83, 0x25, + 0xe8, 0x95, 0xb4, 0x3e, 0x97, 0xd5, 0x9e, 0x18, 0x9e, 0x62, 0x98, 0xc6, 0xe8, 0x95, 0xcd, 0x9e, + 0x95, 0xc9, 0x16, 0x54, 0xd8, 0xef, 0x09, 0x72, 0x6f, 0x4d, 0x6f, 0x18, 0x81, 0x3c, 0x80, 0x9a, + 0x5e, 0xf4, 0x71, 0x4c, 0xa3, 0xd8, 0xab, 0xea, 0x3d, 0xd0, 0xaa, 0x7d, 0xa5, 0x21, 0x0f, 0xa1, + 0x21, 0xd2, 0xe1, 0x10, 0x85, 0xe8, 0x0f, 0x59, 0x9a, 0x48, 0xcf, 0x6d, 0x39, 0x9d, 0x4a, 0x50, + 0xcf, 0x94, 0xef, 0x94, 0x4e, 0x59, 0x41, 0xce, 0x19, 0xcf, 0x20, 0xa0, 0x21, 0xa0, 0x55, 0x06, + 0xe0, 0x43, 0x35, 0x8c, 0x04, 0x1d, 0xc4, 0x18, 0x7a, 0xb5, 0x96, 0xd3, 0xa9, 0x06, 0xb9, 0x4c, + 0x3a, 0xb0, 0x22, 0xe9, 0x48, 0x78, 0xf5, 0x56, 0xb9, 0x53, 0xdb, 0xd9, 0xea, 0xea, 0x04, 0x76, + 0x0f, 0xd9, 0xa0, 0x7b, 0x4c, 0x47, 0x62, 0x3f, 0x91, 0x7c, 0x1a, 0x68, 0x04, 0xf1, 0x60, 0x8d, + 0xa3, 0xe4, 0x11, 0x0a, 0xaf, 0xd1, 0x72, 0x3a, 0x8d, 0xc0, 0x8a, 0xe4, 0x7f, 0xd0, 0x0c, 0x71, + 0x82, 0x49, 0x88, 0x89, 0xec, 0x7f, 0x60, 0x03, 0xe1, 0x35, 0x5b, 0xe5, 0x8e, 0x1b, 0x34, 0x72, + 0xed, 0x21, 0x1b, 0x08, 0xf2, 0x1f, 0x80, 0x09, 0xe5, 0x19, 0xc6, 0x5b, 0xd7, 0xc1, 0xba, 0x46, + 0xa3, 0xd2, 0xdd, 0x82, 0xda, 0x90, 0x25, 0xc3, 0x94, 0x73, 0x4c, 0x86, 0x53, 0x6f, 0x43, 0xef, + 0x17, 0x55, 0x2a, 0x0e, 0xfc, 0x84, 0xc3, 0x54, 0x32, 0xee, 0xdd, 0x32, 0x09, 0xb6, 0x32, 0x39, + 0x80, 0x75, 0xbb, 0xee, 0x0f, 0x59, 0x72, 0x12, 0x8d, 0x3c, 0xa2, 0x43, 0xfa, 0x6f, 0x21, 0xa4, + 0xfd, 0x0c, 0xf1, 0x4e, 0x03, 0x4c, 0x70, 0x4d, 0x9c, 0x51, 0x92, 0x3b, 0xb0, 0x2a, 0x24, 0x95, + 0xa9, 0xf0, 0x36, 0xf5, 0x15, 0x99, 0x44, 0xbe, 0x86, 0xea, 0x18, 0x25, 0x0d, 0xa9, 0xa4, 0xde, + 0x96, 0xb6, 0xec, 0x15, 0x2c, 0xff, 0x98, 0x6d, 0x19, 0x9b, 0x39, 0x92, 0xbc, 0x82, 0x7a, 0x4c, + 0x85, 0xec, 0x67, 0x05, 0xf3, 0xee, 0xb5, 0x9c, 0x4e, 0x6d, 0xe7, 0x6e, 0xe1, 0xe4, 0xfb, 0x34, + 0x8e, 0x55, 0x29, 0x8e, 0xa3, 0x31, 0x06, 0x35, 0x05, 0xee, 0x19, 0x2c, 0xf9, 0x06, 0x40, 0x9f, + 0xd5, 0x95, 0xf4, 0xfc, 0xcb, 0x4f, 0xba, 0x0a, 0xba, 0xaf, 0x90, 0xa4, 0x0b, 0x2b, 0x09, 0x7e, + 0x92, 0xde, 0x5d, 0x7d, 0xc2, 0xef, 0x1a, 0xae, 0x77, 0x2d, 0xd7, 0xbb, 0xc7, 0xb6, 0x19, 0x02, + 0x8d, 0x53, 0x89, 0x0f, 0x23, 0x31, 0x89, 0xe9, 0x54, 0xd3, 0xdd, 0x33, 0x89, 0x2f, 0xa8, 0xc8, + 0x2b, 0x80, 0x09, 0x67, 0xca, 0x29, 0xc6, 0x85, 0xb7, 0xad, 0xa3, 0xf7, 0x0b, 0x9e, 0x1c, 0xe5, + 0x9b, 0x26, 0xfe, 0x02, 0xda, 0x7f, 0x0e, 0x6e, 0xce, 0x24, 0xb2, 0x01, 0xe5, 0x8f, 0x38, 0xcd, + 0x3a, 0x4a, 0x2d, 0x55, 0x63, 0x9c, 0xd1, 0x38, 0xb5, 0xdd, 0x64, 0x84, 0x57, 0xa5, 0x17, 0x8e, + 0xbf, 0x0b, 0x9b, 0x0b, 0xea, 0x75, 0x23, 0x13, 0xaf, 0xa1, 0x31, 0x53, 0x98, 0x1b, 0x1d, 0xfe, + 0x15, 0xea, 0xc5, 0x0c, 0x93, 0x6d, 0x70, 0x4f, 0xa9, 0xe8, 0x1b, 0xb4, 0x63, 0xda, 0xe8, 0x94, + 0x8a, 0x5f, 0x94, 0xac, 0x72, 0xae, 0xe6, 0x80, 0xb6, 0x72, 0x45, 0xce, 0x15, 0xce, 0x0f, 0x60, + 0x7d, 0x2e, 0x69, 0x0b, 0x7c, 0xfb, 0x7f, 0xd1, 0xb7, 0xda, 0xce, 0x66, 0x96, 0xf1, 0xa3, 0x38, + 0x1d, 0x45, 0x89, 0xc9, 0x49, 0xc1, 0xe1, 0xf6, 0x9f, 0x0e, 0xd4, 0x8b, 0x7b, 0xe4, 0x39, 0xac, + 0x66, 0xad, 0xe0, 0xe8, 0x92, 0x3d, 0x58, 0x60, 0xa0, 0x5b, 0xec, 0x85, 0x0c, 0xee, 0xbf, 0x84, + 0xda, 0x67, 0xa6, 0xbc, 0xfd, 0x18, 0x1a, 0x3d, 0x54, 0xfd, 0x1c, 0xe0, 0x6f, 0x29, 0x0a, 0x49, + 0xee, 0x43, 0x59, 0xb5, 0xbb, 0xa3, 0x43, 0x80, 0x73, 0xd2, 0x04, 0x4a, 0xdd, 0xee, 0x42, 0xd3, + 0xc2, 0xc5, 0x84, 0x25, 0x02, 0xaf, 0xc0, 0x3f, 0x86, 0x8d, 0x3d, 0x8c, 0x51, 0x62, 0xe1, 0x86, + 0x7b, 0x50, 0xfd, 0xc0, 0x06, 0xfd, 0xc2, 0xac, 0x5e, 0xfb, 0xc0, 0x06, 0xef, 0xe9, 0x18, 0xdb, + 0x5f, 0xc2, 0xad, 0x02, 0xfc, 0x5a, 0x37, 0x7c, 0x01, 0x8d, 0x83, 0x99, 0x00, 0x2e, 0x31, 0xdf, + 0x85, 0xe6, 0xc1, 0x4d, 0xbc, 0xff, 0xab, 0x04, 0xae, 0xe1, 0x74, 0xc4, 0x92, 0x4b, 0x0c, 0xab, + 0x59, 0x6b, 0x27, 0x46, 0x49, 0x33, 0xcd, 0x8a, 0x6a, 0x3c, 0xb1, 0x54, 0x4e, 0x52, 0xa9, 0x9f, + 0x98, 0x7a, 0x90, 0x49, 0x8a, 0x9d, 0x09, 0x0b, 0xd1, 0x58, 0x5b, 0x31, 0xc3, 0x51, 0x29, 0xb4, + 0xb9, 0x2d, 0xa8, 0x8c, 0x38, 0x4b, 0x27, 0x5e, 0xa5, 0xe5, 0x74, 0xca, 0x81, 0x11, 0xd4, 0x25, + 0x54, 0x4a, 0xf5, 0xf2, 0x79, 0xab, 0x66, 0xa0, 0x67, 0x22, 0x79, 0x09, 0x20, 0x24, 0xe5, 0x12, + 0xc3, 0x3e, 0x95, 0xfa, 0xc9, 0xba, 0x9c, 0xd3, 0x6e, 0x86, 0xde, 0x95, 0xe4, 0x35, 0xd4, 0x4e, + 0xa2, 0x24, 0x12, 0xa7, 0xe6, 0x6c, 0xf5, 0xca, 0xb3, 0x60, 0xe1, 0xbb, 0xb2, 0xfd, 0x1d, 0x6c, + 0xe5, 0xe9, 0xd9, 0x63, 0x09, 0xda, 0x12, 0x74, 0xc1, 0x45, 0xab, 0xcf, 0x72, 0xbb, 0x91, 0xe5, + 0x36, 0xc7, 0x07, 0xe7, 0x90, 0xf6, 0x3e, 0xdc, 0x9e, 0xb3, 0x93, 0x95, 0x87, 0xc0, 0xca, 0x09, + 0x67, 0x63, 0xfb, 0xa4, 0xab, 0xb5, 0x4a, 0xc3, 0x84, 0x4e, 0x63, 0x46, 0x43, 0x9d, 0xeb, 0x7a, + 0x60, 0x45, 0x45, 0x85, 0x20, 0x4d, 0xae, 0x4d, 0x05, 0x8b, 0xbd, 0x2e, 0x91, 0x8f, 0xd9, 0x68, + 0x14, 0x5f, 0x9f, 0xc8, 0x05, 0xf8, 0xf5, 0xc8, 0xe6, 0x00, 0x04, 0xf4, 0x44, 0xf6, 0x90, 0x9f, + 0x21, 0x27, 0x4d, 0x28, 0x45, 0x61, 0x66, 0xb6, 0x14, 0x85, 0xfa, 0x77, 0xc3, 0x42, 0xdb, 0xc1, + 0x7a, 0xad, 0x19, 0x11, 0x86, 0x5c, 0xd1, 0xce, 0x7c, 0x60, 0xac, 0xa8, 0x68, 0x17, 0x23, 0x0d, + 0x91, 0x6b, 0x6e, 0x55, 0x83, 0x4c, 0xd2, 0x83, 0x80, 0x49, 0xe4, 0x9a, 0x59, 0xd5, 0xc0, 0x08, + 0xea, 0xdb, 0xc2, 0xe9, 0x89, 0xec, 0xeb, 0x72, 0x0f, 0x59, 0xac, 0xf9, 0xe5, 0x06, 0x75, 0xa5, + 0x3c, 0xca, 0x74, 0x6d, 0x0a, 0xf7, 0x95, 0x7b, 0x07, 0x28, 0xcd, 0xac, 0x49, 0x39, 0xd5, 0x75, + 0xb4, 0xd1, 0x3d, 0x82, 0x35, 0xa1, 0x5d, 0x17, 0xd9, 0xf8, 0xba, 0x95, 0x45, 0x78, 0x1e, 0x54, + 0x60, 0x11, 0xca, 0x8f, 0x28, 0x09, 0xf1, 0x93, 0x0e, 0x67, 0x25, 0x30, 0x42, 0xfb, 0x11, 0xdc, + 0x53, 0xe0, 0x00, 0xc7, 0xec, 0x0c, 0x8f, 0x10, 0xf9, 0xb7, 0xd3, 0xef, 0xf7, 0x6c, 0xb6, 0xe7, + 0x12, 0xd2, 0x7e, 0x0b, 0xcd, 0xdd, 0x11, 0x26, 0x32, 0x48, 0x93, 0x9e, 0xe4, 0x48, 0xc7, 0x37, + 0xa6, 0xdd, 0x5b, 0xd8, 0xb0, 0x16, 0x3e, 0x93, 0x71, 0x3f, 0xc1, 0xf6, 0x01, 0xca, 0xdd, 0xa1, + 0x8c, 0xce, 0x30, 0xbf, 0x42, 0xe4, 0xc6, 0x9e, 0x02, 0xe4, 0xb7, 0xd9, 0xac, 0x5c, 0xf4, 0xa8, + 0x80, 0x69, 0xf7, 0x61, 0xfd, 0xdc, 0xa5, 0x6b, 0x0c, 0xe4, 0xd9, 0x98, 0x4b, 0x57, 0xc6, 0xbc, + 0xf3, 0x4f, 0x05, 0x2a, 0x7b, 0xea, 0x07, 0x4e, 0x9e, 0xc1, 0xaa, 0x19, 0x86, 0xc4, 0xfe, 0x22, + 0x67, 0xe6, 0xa8, 0x7f, 0x7b, 0x4e, 0x9b, 0xc5, 0x74, 0x08, 0x8d, 0x99, 0x5e, 0x25, 0xdb, 0xf3, + 0xd7, 0x15, 0x26, 0x81, 0x7f, 0x7f, 0xf1, 0x66, 0x66, 0xeb, 0x39, 0x54, 0x7e, 0x40, 0x7a, 0x86, + 0xe4, 0xce, 0x85, 0x81, 0xb3, 0xaf, 0x3e, 0xf8, 0xfe, 0x12, 0xbd, 0xf2, 0xbd, 0x37, 0xeb, 0x7b, + 0x6f, 0xa1, 0xef, 0x73, 0x6f, 0xd5, 0x1b, 0x70, 0xf3, 0xe7, 0x85, 0xd8, 0xaf, 0xd9, 0xfc, 0xfb, + 0xe4, 0x7b, 0x17, 0x37, 0xb2, 0xf3, 0xcf, 0x60, 0xd5, 0x0c, 0x8d, 0xfc, 0xda, 0x99, 0x79, 0x93, + 0x5f, 0x3b, 0x37, 0x59, 0xde, 0x80, 0x9b, 0x0f, 0x83, 0xfc, 0xda, 0xf9, 0x69, 0x92, 0x5f, 0x7b, + 0x71, 0x6e, 0xf4, 0x60, 0x6b, 0x51, 0xe7, 0x2d, 0xcd, 0xda, 0xc3, 0x42, 0xe3, 0x2d, 0x6d, 0xd7, + 0xf7, 0x40, 0x2e, 0xf6, 0x1a, 0x69, 0x15, 0x8e, 0x2e, 0x6c, 0xc3, 0xa5, 0x25, 0xf9, 0x19, 0x36, + 0x17, 0xb4, 0xc2, 0x52, 0x1f, 0xdb, 0xe7, 0xec, 0x5a, 0xda, 0x3e, 0x2f, 0xa0, 0xde, 0x43, 0x79, + 0xfe, 0x00, 0x5f, 0x20, 0xf6, 0x32, 0x67, 0x76, 0xf6, 0xa0, 0xa2, 0xdb, 0x88, 0xbc, 0x86, 0xaa, + 0xed, 0x27, 0x72, 0x27, 0x3b, 0x3e, 0xd7, 0x60, 0x79, 0xd5, 0x66, 0xa7, 0xc9, 0x53, 0x67, 0xb0, + 0xaa, 0xad, 0x7e, 0xf5, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc5, 0xb4, 0x29, 0x03, 0xb3, 0x0e, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1321,7 +1371,6 @@ type DkronClient interface { ToggleJob(ctx context.Context, in *ToggleJobRequest, opts ...grpc.CallOption) (*ToggleJobResponse, error) RaftGetConfiguration(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*RaftGetConfigurationResponse, error) RaftRemovePeerByID(ctx context.Context, in *RaftRemovePeerByIDRequest, opts ...grpc.CallOption) (*empty.Empty, error) - AgentRun(ctx context.Context, opts ...grpc.CallOption) (Dkron_AgentRunClient, error) GetActiveExecutions(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetActiveExecutionsResponse, error) SetExecution(ctx context.Context, in *Execution, opts ...grpc.CallOption) (*empty.Empty, error) } @@ -1415,40 +1464,6 @@ func (c *dkronClient) RaftRemovePeerByID(ctx context.Context, in *RaftRemovePeer return out, nil } -func (c *dkronClient) AgentRun(ctx context.Context, opts ...grpc.CallOption) (Dkron_AgentRunClient, error) { - stream, err := c.cc.NewStream(ctx, &_Dkron_serviceDesc.Streams[0], "/types.Dkron/AgentRun", opts...) - if err != nil { - return nil, err - } - x := &dkronAgentRunClient{stream} - return x, nil -} - -type Dkron_AgentRunClient interface { - Send(*AgentRunStream) error - CloseAndRecv() (*AgentRunResponse, error) - grpc.ClientStream -} - -type dkronAgentRunClient struct { - grpc.ClientStream -} - -func (x *dkronAgentRunClient) Send(m *AgentRunStream) error { - return x.ClientStream.SendMsg(m) -} - -func (x *dkronAgentRunClient) CloseAndRecv() (*AgentRunResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(AgentRunResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func (c *dkronClient) GetActiveExecutions(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*GetActiveExecutionsResponse, error) { out := new(GetActiveExecutionsResponse) err := c.cc.Invoke(ctx, "/types.Dkron/GetActiveExecutions", in, out, opts...) @@ -1478,7 +1493,6 @@ type DkronServer interface { ToggleJob(context.Context, *ToggleJobRequest) (*ToggleJobResponse, error) RaftGetConfiguration(context.Context, *empty.Empty) (*RaftGetConfigurationResponse, error) RaftRemovePeerByID(context.Context, *RaftRemovePeerByIDRequest) (*empty.Empty, error) - AgentRun(Dkron_AgentRunServer) error GetActiveExecutions(context.Context, *empty.Empty) (*GetActiveExecutionsResponse, error) SetExecution(context.Context, *Execution) (*empty.Empty, error) } @@ -1514,9 +1528,6 @@ func (*UnimplementedDkronServer) RaftGetConfiguration(ctx context.Context, req * func (*UnimplementedDkronServer) RaftRemovePeerByID(ctx context.Context, req *RaftRemovePeerByIDRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method RaftRemovePeerByID not implemented") } -func (*UnimplementedDkronServer) AgentRun(srv Dkron_AgentRunServer) error { - return status.Errorf(codes.Unimplemented, "method AgentRun not implemented") -} func (*UnimplementedDkronServer) GetActiveExecutions(ctx context.Context, req *empty.Empty) (*GetActiveExecutionsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetActiveExecutions not implemented") } @@ -1690,32 +1701,6 @@ func _Dkron_RaftRemovePeerByID_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } -func _Dkron_AgentRun_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(DkronServer).AgentRun(&dkronAgentRunServer{stream}) -} - -type Dkron_AgentRunServer interface { - SendAndClose(*AgentRunResponse) error - Recv() (*AgentRunStream, error) - grpc.ServerStream -} - -type dkronAgentRunServer struct { - grpc.ServerStream -} - -func (x *dkronAgentRunServer) SendAndClose(m *AgentRunResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *dkronAgentRunServer) Recv() (*AgentRunStream, error) { - m := new(AgentRunStream) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - func _Dkron_GetActiveExecutions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(empty.Empty) if err := dec(in); err != nil { @@ -1801,11 +1786,104 @@ var _Dkron_serviceDesc = grpc.ServiceDesc{ Handler: _Dkron_SetExecution_Handler, }, }, + Streams: []grpc.StreamDesc{}, + Metadata: "dkron.proto", +} + +// AgentClient is the client API for Agent service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type AgentClient interface { + AgentRun(ctx context.Context, in *AgentRunRequest, opts ...grpc.CallOption) (Agent_AgentRunClient, error) +} + +type agentClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentClient(cc grpc.ClientConnInterface) AgentClient { + return &agentClient{cc} +} + +func (c *agentClient) AgentRun(ctx context.Context, in *AgentRunRequest, opts ...grpc.CallOption) (Agent_AgentRunClient, error) { + stream, err := c.cc.NewStream(ctx, &_Agent_serviceDesc.Streams[0], "/types.Agent/AgentRun", opts...) + if err != nil { + return nil, err + } + x := &agentAgentRunClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Agent_AgentRunClient interface { + Recv() (*AgentRunStream, error) + grpc.ClientStream +} + +type agentAgentRunClient struct { + grpc.ClientStream +} + +func (x *agentAgentRunClient) Recv() (*AgentRunStream, error) { + m := new(AgentRunStream) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AgentServer is the server API for Agent service. +type AgentServer interface { + AgentRun(*AgentRunRequest, Agent_AgentRunServer) error +} + +// UnimplementedAgentServer can be embedded to have forward compatible implementations. +type UnimplementedAgentServer struct { +} + +func (*UnimplementedAgentServer) AgentRun(req *AgentRunRequest, srv Agent_AgentRunServer) error { + return status.Errorf(codes.Unimplemented, "method AgentRun not implemented") +} + +func RegisterAgentServer(s *grpc.Server, srv AgentServer) { + s.RegisterService(&_Agent_serviceDesc, srv) +} + +func _Agent_AgentRun_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(AgentRunRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentServer).AgentRun(m, &agentAgentRunServer{stream}) +} + +type Agent_AgentRunServer interface { + Send(*AgentRunStream) error + grpc.ServerStream +} + +type agentAgentRunServer struct { + grpc.ServerStream +} + +func (x *agentAgentRunServer) Send(m *AgentRunStream) error { + return x.ServerStream.SendMsg(m) +} + +var _Agent_serviceDesc = grpc.ServiceDesc{ + ServiceName: "types.Agent", + HandlerType: (*AgentServer)(nil), + Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "AgentRun", - Handler: _Dkron_AgentRun_Handler, - ClientStreams: true, + Handler: _Agent_AgentRun_Handler, + ServerStreams: true, }, }, Metadata: "dkron.proto", diff --git a/proto/dkron.proto b/proto/dkron.proto index 739d8c55a..8cb0a5542 100644 --- a/proto/dkron.proto +++ b/proto/dkron.proto @@ -140,7 +140,15 @@ service Dkron { rpc ToggleJob (ToggleJobRequest) returns (ToggleJobResponse); rpc RaftGetConfiguration (google.protobuf.Empty) returns (RaftGetConfigurationResponse); rpc RaftRemovePeerByID (RaftRemovePeerByIDRequest) returns (google.protobuf.Empty); - rpc AgentRun (stream AgentRunStream) returns (AgentRunResponse); rpc GetActiveExecutions (google.protobuf.Empty) returns (GetActiveExecutionsResponse); rpc SetExecution (Execution) returns (google.protobuf.Empty); } + +message AgentRunRequest { + Job job = 1; + Execution execution = 2; +} + +service Agent { + rpc AgentRun (AgentRunRequest) returns (stream AgentRunStream); +} diff --git a/scripts/ansible/site.yml b/scripts/ansible/site.yml index 23483a31a..7506e7dff 100644 --- a/scripts/ansible/site.yml +++ b/scripts/ansible/site.yml @@ -65,3 +65,51 @@ name: dkron enabled: yes state: restarted + + ### File descriptor and memory tweaks + + - name: Increase sysctl open files system wide + sysctl: + name: fs.file-max + value: "3243542" + tags: performance + + - name: Increasing number of open files + lineinfile: dest=/etc/systemd/system.conf regexp='^DefaultLimitNOFILE=65535' line='DefaultLimitNOFILE=65535' state=present + tags: performance + + - name: Increasing number of open files in service + lineinfile: dest=/lib/systemd/system/dkron.service regexp='^KillSignal=SIGTERM\n^LimitNOFILE=65535' line='LimitNOFILE=65535' state=present + tags: performance_b + + ### Network performance tweaks + + - name: Set ARP GC entry point at 0 + sysctl: + name: net.ipv4.neigh.default.gc_thresh1 + value: "0" + tags: performance + + - name: Increase maximum number of sockets + sysctl: + name: net.core.somaxconn + value: "32768" + tags: performance + + - name: Increase maximum number of sockets in the backlog + sysctl: + name: net.ipv4.tcp_max_syn_backlog + value: "131072" + tags: performance + + - name: Setting sane defaults for TCP reading sockets + sysctl: + name: net.ipv4.tcp_rmem + value: "4096 16384 16777216" + tags: performance + + - name: Setting sane defaults for TCP writing sockets + sysctl: + name: net.ipv4.tcp_wmem + value: "4096 16384 16777216" + tags: performance