From 86f0da601b227682230bf009a6546b2620283ce4 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 25 Oct 2024 11:19:17 +0000 Subject: [PATCH] chore(pkg/server): more robust service architecture - handles crash at the same time as stop - keeps track of inner state to avoid wrong usage --- pkg/server/server.go | 90 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 27332297..a25c1f4f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -19,8 +19,10 @@ type Server struct { // Internal state handlerCancel context.CancelFunc - running bool - runningMutex sync.Mutex + state goservices.State + stateMutex sync.RWMutex + interceptStop chan<- struct{} + interceptDone <-chan struct{} startStopMutex sync.Mutex // prevents concurrent calls to Start and Stop. subServers goservices.Service listeningAddr netip.AddrPort @@ -47,11 +49,17 @@ func (s *Server) Start(ctx context.Context) (runError <-chan error, startErr err s.startStopMutex.Lock() defer s.startStopMutex.Unlock() - s.runningMutex.Lock() - if s.running { - panic("server already running") + // Lock the state in case the server is already running. + s.stateMutex.RLock() + state := s.state + // no need to keep a lock on the state since the `startStopMutex` + // prevents concurrent calls to `Start` and `Stop`. + s.stateMutex.RUnlock() + if state == goservices.StateRunning { + return nil, fmt.Errorf("%s: %w", s, goservices.ErrAlreadyStarted) } - s.runningMutex.Unlock() + + s.state = goservices.StateStarting handlerCtx, handlerCancel := context.WithCancel(context.Background()) defer func() { @@ -92,30 +100,72 @@ func (s *Server) Start(ctx context.Context) (runError <-chan error, startErr err return nil, fmt.Errorf("creating sub servers group: %w", err) } - runErrorCh, err := s.subServers.Start(ctx) + runErrorCh := make(chan error) + runError = runErrorCh + subServersRunError, err := s.subServers.Start(ctx) if err != nil { return nil, fmt.Errorf("starting sub servers: %w", err) } + interceptStop := make(chan struct{}) + s.interceptStop = interceptStop + interceptDone := make(chan struct{}) + s.interceptDone = interceptDone + go s.runInterceptError(interceptStop, interceptDone, + subServersRunError, runErrorCh) + s.logger.Info("DNS server listening on " + s.listeningAddr.String()) - s.runningMutex.Lock() - s.running = true - s.runningMutex.Unlock() + s.stateMutex.Lock() + s.state = goservices.StateRunning + s.stateMutex.Unlock() + + return runError, nil +} - return runErrorCh, nil +func (s *Server) runInterceptError(stop <-chan struct{}, done chan<- struct{}, + incomingError <-chan error, outgoingError chan<- error, +) { + defer close(done) + var err error + select { + case err = <-incomingError: + case <-stop: + return + } + s.stateMutex.RLock() + state := s.state + s.stateMutex.RUnlock() + if state == goservices.StateStopping { + return + } + s.stateMutex.Lock() + s.state = goservices.StateCrashed + s.stateMutex.Unlock() + outgoingError <- err } func (s *Server) Stop() (err error) { s.startStopMutex.Lock() defer s.startStopMutex.Unlock() - s.runningMutex.Lock() - running := s.running //nolint:ifshort - s.runningMutex.Unlock() - if !running { // server crashed whilst we were stopping it - return nil + s.stateMutex.Lock() + switch s.state { + case goservices.StateRunning: // continue stopping the service + case goservices.StateCrashed: + s.stateMutex.Unlock() + return fmt.Errorf("%w (crashed)", goservices.ErrAlreadyStopped) + case goservices.StateStopped: + s.stateMutex.Unlock() + return fmt.Errorf("%w", goservices.ErrAlreadyStopped) + case goservices.StateStarting, goservices.StateStopping: + s.stateMutex.Unlock() + panic("bad implementation code: " + + "this code path should be unreachable for the \"" + + fmt.Sprint(s.state) + "\" state") } + s.state = goservices.StateStopping + s.stateMutex.Unlock() s.handlerCancel() @@ -130,6 +180,8 @@ func (s *Server) Stop() (err error) { } } + s.state = goservices.StateStopped + return err } @@ -142,7 +194,11 @@ func (s *Server) ListeningAddress() (address netip.AddrPort, err error) { s.startStopMutex.Lock() defer s.startStopMutex.Unlock() - if !s.running { + s.stateMutex.RLock() + state := s.state + s.stateMutex.RUnlock() + + if state != goservices.StateRunning { return netip.AddrPort{}, fmt.Errorf("%w", ErrServerNotRunning) }