Skip to content

Commit

Permalink
Overhaul logging to avoid package level var (distribworks#963)
Browse files Browse the repository at this point in the history
This converts all logging calls to a logrus instance that is passed into types or functions instead of a package level variable. This is to help prevent any possible race conditions with the variable and it also helps show the dependency on logging for each type instead of an implicit reference.

Originally I attempted to abstract a logging interface away from logrus but so much of the logrus API is being used that it was quite difficult to abstract away and there was not much direct benefit from doing so. The tests are all passing and I did test running docker-compose locally. I can add jobs via the UI and execute them with no panics. I feel relatively confident with these changes, but I'm not sure how complete the test coverage is in order to prove that.
  • Loading branch information
sysadmind authored May 13, 2021
1 parent 3f79657 commit 3ea5124
Show file tree
Hide file tree
Showing 27 changed files with 293 additions and 241 deletions.
4 changes: 3 additions & 1 deletion cmd/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"github.com/distribworks/dkron/v3/dkron"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -23,7 +24,8 @@ var leaveCmd = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) error {
var gc dkron.DkronGRPCClient
gc = dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc = dkron.NewGRPCClient(nil, nil, log)

if err := gc.Leave(ip); err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions cmd/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/distribworks/dkron/v3/dkron"
"github.com/ryanuber/columnize"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -29,7 +30,8 @@ var raftListCmd = &cobra.Command{
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
gc := dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc := dkron.NewGRPCClient(nil, nil, log)

reply, err := gc.RaftGetConfiguration(ip)
if err != nil {
Expand Down Expand Up @@ -60,7 +62,8 @@ var raftRemovePeerCmd = &cobra.Command{
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
gc := dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc := dkron.NewGRPCClient(nil, nil, log)

if err := gc.RaftRemovePeerByID(ip, peerID); err != nil {
return err
Expand Down
94 changes: 49 additions & 45 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type Agent struct {
activeExecutions sync.Map

listener net.Listener

// logger is the log entry to use fo all logging calls
logger *logrus.Entry
}

// ProcessorFactory is a function type that creates a new instance
Expand Down Expand Up @@ -145,7 +148,8 @@ func NewAgent(config *Config, options ...AgentOption) *Agent {
// Start the current agent by running all the necessary
// checks and server or client routines.
func (a *Agent) Start() error {
InitLogger(a.config.LogLevel, a.config.NodeName)
log := InitLogger(a.config.LogLevel, a.config.NodeName)
a.logger = log

// Normalize configured addresses
a.config.normalizeAddrs()
Expand All @@ -164,7 +168,7 @@ func (a *Agent) Start() error {
}

if err := initMetrics(a); err != nil {
log.Fatal("agent: Can not setup metrics")
a.logger.Fatal("agent: Can not setup metrics")
}

// Expose the node name
Expand All @@ -179,7 +183,7 @@ func (a *Agent) Start() error {
addr := a.bindRPCAddr()
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
a.listener = l

Expand All @@ -193,13 +197,13 @@ func (a *Agent) Start() error {
}

grpcServer := grpc.NewServer(opts...)
as := NewAgentServer(a)
as := NewAgentServer(a, a.logger)
proto.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(l)
}

if a.GRPCClient == nil {
a.GRPCClient = NewGRPCClient(nil, a)
a.GRPCClient = NewGRPCClient(nil, a, a.logger)
}

tags := a.serf.LocalMember().Tags
Expand Down Expand Up @@ -233,7 +237,7 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) {
// was participating in leader election or not (local storage).
// Then actually leave the cluster.
func (a *Agent) Stop() error {
log.Info("agent: Called member stop, now stopping")
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server {
a.raft.Shutdown()
Expand Down Expand Up @@ -264,8 +268,8 @@ func (a *Agent) setupRaft() error {
}

logger := ioutil.Discard
if log.Logger.Level == logrus.DebugLevel {
logger = log.Logger.Writer()
if a.logger.Logger.Level == logrus.DebugLevel {
logger = a.logger.Logger.Writer()
}

transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger)
Expand Down Expand Up @@ -324,25 +328,25 @@ func (a *Agent) setupRaft() error {
// Check for peers.json file for recovery
peersFile := filepath.Join(a.config.DataDir, "raft", "peers.json")
if _, err := os.Stat(peersFile); err == nil {
log.Info("found peers.json file, recovering Raft configuration...")
a.logger.Info("found peers.json file, recovering Raft configuration...")
var configuration raft.Configuration
configuration, err = raft.ReadConfigJSON(peersFile)
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
store, err := NewStore()
store, err := NewStore(a.logger)
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
tmpFsm := newFSM(store, nil)
tmpFsm := newFSM(store, nil, a.logger)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
}
if err := os.Remove(peersFile); err != nil {
return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
}
log.Info("deleted peers.json file after successful recovery")
a.logger.Info("deleted peers.json file after successful recovery")
}
}

Expand Down Expand Up @@ -370,7 +374,7 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store, a.ProAppliers)
fsm := newFSM(a.Store, a.ProAppliers, a.logger)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
Expand Down Expand Up @@ -451,19 +455,19 @@ func (a *Agent) setupSerf() (*serf.Serf, error) {
serfConfig.ReconnectTimeout, err = time.ParseDuration(config.SerfReconnectTimeout)

if err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}

// Create a channel to listen for events from Serf
a.eventCh = make(chan serf.Event, 2048)
serfConfig.EventCh = a.eventCh

// Start Serf
log.Info("agent: Dkron agent starting")
a.logger.Info("agent: Dkron agent starting")

if log.Logger.Level == logrus.DebugLevel {
serfConfig.LogOutput = log.Logger.Writer()
serfConfig.MemberlistConfig.LogOutput = log.Logger.Writer()
if a.logger.Logger.Level == logrus.DebugLevel {
serfConfig.LogOutput = a.logger.Logger.Writer()
serfConfig.MemberlistConfig.LogOutput = a.logger.Logger.Writer()
} else {
serfConfig.LogOutput = ioutil.Discard
serfConfig.MemberlistConfig.LogOutput = ioutil.Discard
Expand All @@ -472,7 +476,7 @@ func (a *Agent) setupSerf() (*serf.Serf, error) {
// Create serf first
serf, err := serf.Create(serfConfig)
if err != nil {
log.Error(err)
a.logger.Error(err)
return nil, err
}
return serf, nil
Expand All @@ -491,17 +495,17 @@ func (a *Agent) SetConfig(c *Config) {
// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
if a.Store == nil {
s, err := NewStore()
s, err := NewStore(a.logger)
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
a.Store = s
}

a.sched = NewScheduler()
a.sched = NewScheduler(a.logger)

if a.HTTPTransport == nil {
a.HTTPTransport = NewTransport(a)
a.HTTPTransport = NewTransport(a, a.logger)
}
a.HTTPTransport.ServeHTTP()

Expand Down Expand Up @@ -529,12 +533,12 @@ func (a *Agent) StartServer() {

go func() {
if err := tlsm.Serve(); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
}()
} else {
// Declare a plain RaftLayer
a.raftLayer = NewRaftLayer()
a.raftLayer = NewRaftLayer(a.logger)

// Declare the match for gRPC
grpcl = tcpm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
Expand All @@ -544,25 +548,25 @@ func (a *Agent) StartServer() {
}

if a.GRPCServer == nil {
a.GRPCServer = NewGRPCServer(a)
a.GRPCServer = NewGRPCServer(a, a.logger)
}

if err := a.GRPCServer.Serve(grpcl); err != nil {
log.WithError(err).Fatal("agent: RPC server failed to start")
a.logger.WithError(err).Fatal("agent: RPC server failed to start")
}

if err := a.raftLayer.Open(raftl); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}

if err := a.setupRaft(); err != nil {
log.WithError(err).Fatal("agent: Raft layer failed to start")
a.logger.WithError(err).Fatal("agent: Raft layer failed to start")
}

// Start serving everything
go func() {
if err := tcpm.Serve(); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
}()
go a.monitorLeadership()
Expand Down Expand Up @@ -629,17 +633,17 @@ func (a *Agent) LocalServers() (members []*ServerParts) {
// Listens to events from Serf and handle the event.
func (a *Agent) eventLoop() {
serfShutdownCh := a.serf.ShutdownCh()
log.Info("agent: Listen for events")
a.logger.Info("agent: Listen for events")
for {
select {
case e := <-a.eventCh:
log.WithField("event", e.String()).Info("agent: Received event")
a.logger.WithField("event", e.String()).Info("agent: Received event")
metrics.IncrCounter([]string{"agent", "event_received", e.String()}, 1)

// Log all member events
if me, ok := e.(serf.MemberEvent); ok {
for _, member := range me.Members {
log.WithFields(logrus.Fields{
a.logger.WithFields(logrus.Fields{
"node": a.config.NodeName,
"member": member.Name,
"event": e.EventType(),
Expand All @@ -662,26 +666,26 @@ func (a *Agent) eventLoop() {
a.localMemberEvent(me)
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
default:
log.WithField("event", e.String()).Warn("agent: Unhandled serf event")
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
}

case <-serfShutdownCh:
log.Warn("agent: Serf shutdown detected, quitting")
a.logger.Warn("agent: Serf shutdown detected, quitting")
return
}
}
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
log.Infof("agent: joining: %v replay: %v", addrs, replay)
a.logger.Infof("agent: joining: %v replay: %v", addrs, replay)
n, err = a.serf.Join(addrs, !replay)
if n > 0 {
log.Infof("agent: joined: %d nodes", n)
a.logger.Infof("agent: joined: %d nodes", n)
}
if err != nil {
log.Warnf("agent: error joining: %v", err)
a.logger.Warnf("agent: error joining: %v", err)
}
return
}
Expand Down Expand Up @@ -881,11 +885,11 @@ func (a *Agent) checkAndSelectServer() (string, error) {
}

for _, peer := range peers {
log.WithField("peer", peer).Debug("Checking peer")
a.logger.WithField("peer", peer).Debug("Checking peer")
conn, err := net.DialTimeout("tcp", peer, 1*time.Second)
if err == nil {
conn.Close()
log.WithField("peer", peer).Debug("Found good peer")
a.logger.WithField("peer", peer).Debug("Found good peer")
return peer, nil
}
}
Expand All @@ -894,27 +898,27 @@ func (a *Agent) checkAndSelectServer() (string, error) {

func (a *Agent) startReporter() {
if a.config.DisableUsageStats || a.config.DevMode {
log.Info("agent: usage report client disabled")
a.logger.Info("agent: usage report client disabled")
return
}

clusterID, err := a.config.Hash()
if err != nil {
log.Warning("agent: unable to hash the service configuration:", err.Error())
a.logger.Warn("agent: unable to hash the service configuration:", err.Error())
return
}

go func() {
serverID, _ := uuid.GenerateUUID()
log.Info(fmt.Sprintf("agent: registering usage stats for cluster ID '%s'", clusterID))
a.logger.Info(fmt.Sprintf("agent: registering usage stats for cluster ID '%s'", clusterID))

if err := client.StartReporter(context.Background(), client.Options{
ClusterID: clusterID,
ServerID: serverID,
URL: "https://stats.dkron.io",
Version: fmt.Sprintf("%s %s", Name, Version),
}); err != nil {
log.Warning("agent: unable to create the usage report client:", err.Error())
a.logger.Warn("agent: unable to create the usage report client:", err.Error())
}
}()
}
3 changes: 2 additions & 1 deletion dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"fmt"
"io/ioutil"
"log"
"os"
"testing"
"time"
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestAgentCommand_runForElection(t *testing.T) {
// Wait until a follower steps as leader
time.Sleep(2 * time.Second)
assert.True(t, (a2.IsLeader() || a3.IsLeader()))
log.Info(a3.IsLeader())
log.Println(a3.IsLeader())

a2.Stop()
a3.Stop()
Expand Down
Loading

0 comments on commit 3ea5124

Please sign in to comment.