Skip to content

Commit

Permalink
chore(pkg/server): more robust service architecture
Browse files Browse the repository at this point in the history
- handles crash at the same time as stop
- keeps track of inner state to avoid wrong usage
  • Loading branch information
qdm12 committed Oct 25, 2024
1 parent 82d7c6e commit 86f0da6
Showing 1 changed file with 73 additions and 17 deletions.
90 changes: 73 additions & 17 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type Server struct {

// Internal state
handlerCancel context.CancelFunc
running bool
runningMutex sync.Mutex
state goservices.State
stateMutex sync.RWMutex
interceptStop chan<- struct{}
interceptDone <-chan struct{}
startStopMutex sync.Mutex // prevents concurrent calls to Start and Stop.
subServers goservices.Service
listeningAddr netip.AddrPort
Expand All @@ -47,11 +49,17 @@ func (s *Server) Start(ctx context.Context) (runError <-chan error, startErr err
s.startStopMutex.Lock()
defer s.startStopMutex.Unlock()

s.runningMutex.Lock()
if s.running {
panic("server already running")
// Lock the state in case the server is already running.
s.stateMutex.RLock()
state := s.state
// no need to keep a lock on the state since the `startStopMutex`
// prevents concurrent calls to `Start` and `Stop`.
s.stateMutex.RUnlock()
if state == goservices.StateRunning {
return nil, fmt.Errorf("%s: %w", s, goservices.ErrAlreadyStarted)
}
s.runningMutex.Unlock()

s.state = goservices.StateStarting

handlerCtx, handlerCancel := context.WithCancel(context.Background())
defer func() {
Expand Down Expand Up @@ -92,30 +100,72 @@ func (s *Server) Start(ctx context.Context) (runError <-chan error, startErr err
return nil, fmt.Errorf("creating sub servers group: %w", err)
}

runErrorCh, err := s.subServers.Start(ctx)
runErrorCh := make(chan error)
runError = runErrorCh
subServersRunError, err := s.subServers.Start(ctx)
if err != nil {
return nil, fmt.Errorf("starting sub servers: %w", err)
}

interceptStop := make(chan struct{})
s.interceptStop = interceptStop
interceptDone := make(chan struct{})
s.interceptDone = interceptDone
go s.runInterceptError(interceptStop, interceptDone,
subServersRunError, runErrorCh)

s.logger.Info("DNS server listening on " + s.listeningAddr.String())

s.runningMutex.Lock()
s.running = true
s.runningMutex.Unlock()
s.stateMutex.Lock()
s.state = goservices.StateRunning
s.stateMutex.Unlock()

return runError, nil
}

return runErrorCh, nil
func (s *Server) runInterceptError(stop <-chan struct{}, done chan<- struct{},
incomingError <-chan error, outgoingError chan<- error,
) {
defer close(done)
var err error
select {
case err = <-incomingError:
case <-stop:
return
}
s.stateMutex.RLock()
state := s.state
s.stateMutex.RUnlock()
if state == goservices.StateStopping {
return
}
s.stateMutex.Lock()
s.state = goservices.StateCrashed
s.stateMutex.Unlock()
outgoingError <- err
}

func (s *Server) Stop() (err error) {
s.startStopMutex.Lock()
defer s.startStopMutex.Unlock()

s.runningMutex.Lock()
running := s.running //nolint:ifshort
s.runningMutex.Unlock()
if !running { // server crashed whilst we were stopping it
return nil
s.stateMutex.Lock()
switch s.state {
case goservices.StateRunning: // continue stopping the service
case goservices.StateCrashed:
s.stateMutex.Unlock()
return fmt.Errorf("%w (crashed)", goservices.ErrAlreadyStopped)
case goservices.StateStopped:
s.stateMutex.Unlock()
return fmt.Errorf("%w", goservices.ErrAlreadyStopped)
case goservices.StateStarting, goservices.StateStopping:
s.stateMutex.Unlock()
panic("bad implementation code: " +
"this code path should be unreachable for the \"" +
fmt.Sprint(s.state) + "\" state")
}
s.state = goservices.StateStopping
s.stateMutex.Unlock()

s.handlerCancel()

Expand All @@ -130,6 +180,8 @@ func (s *Server) Stop() (err error) {
}
}

s.state = goservices.StateStopped

return err
}

Expand All @@ -142,7 +194,11 @@ func (s *Server) ListeningAddress() (address netip.AddrPort, err error) {
s.startStopMutex.Lock()
defer s.startStopMutex.Unlock()

if !s.running {
s.stateMutex.RLock()
state := s.state
s.stateMutex.RUnlock()

if state != goservices.StateRunning {
return netip.AddrPort{}, fmt.Errorf("%w", ErrServerNotRunning)
}

Expand Down

0 comments on commit 86f0da6

Please sign in to comment.