Skip to content

Commit

Permalink
Fix from code review pt. 1
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle committed Oct 14, 2023
1 parent 571b24e commit 0da78fe
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions nameresolution/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -43,6 +44,7 @@ type resolver struct {
registrationID string
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
}

// NewResolver creates a name resolver that is based on a SQLite DB.
Expand All @@ -55,6 +57,10 @@ func NewResolver(logger logger.Logger) nameresolution.Resolver {

// Init initializes the name resolver.
func (s *resolver) Init(ctx context.Context, md nameresolution.Metadata) error {
if s.closed.Load() {
return errors.New("component is closed")
}

err := s.metadata.InitWithMetadata(md)
if err != nil {
return err
Expand Down Expand Up @@ -99,6 +105,7 @@ func (s *resolver) Init(ctx context.Context, md nameresolution.Metadata) error {
return err
}

s.wg.Add(1)
go s.renewRegistration()

return nil
Expand Down Expand Up @@ -155,13 +162,18 @@ func (s *resolver) registerHost(ctx context.Context) error {
// In backgrounds, periodically renews the host's registration
// Should be invoked in a background goroutine
func (s *resolver) renewRegistration() {
defer s.wg.Done()

addr := s.metadata.GetAddress()

d := s.metadata.UpdateInterval - s.metadata.Timeout
s.logger.Debugf("Started renewing host registration in background with interval %v", s.metadata.UpdateInterval)
t := time.NewTicker(d)
defer t.Stop()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-s.closeCh:
Expand All @@ -171,18 +183,20 @@ func (s *resolver) renewRegistration() {

case <-t.C:
// Renew on the ticker
err := s.doRenewRegistration(context.Background(), addr)
if err != nil {
// Log errors
s.logger.Errorf("Failed to update host registration: %v", err)

if errors.Is(err, errRegistrationLost) {
// This means that our registration has been taken over by another host
// It should never happen unless there's something really bad going on
// Panicking here to force a restart of Dapr
s.logger.Fatalf("Host registration lost")
go func() {
err := s.doRenewRegistration(ctx, addr)
if err != nil {
// Log errors
s.logger.Errorf("Failed to update host registration: %v", err)

if errors.Is(err, errRegistrationLost) {
// This means that our registration has been taken over by another host
// It should never happen unless there's something really bad going on
// Panicking here to force a restart of Dapr
s.logger.Fatalf("Host registration lost")
}
}
}
}()
}
}
}
Expand Down Expand Up @@ -271,10 +285,12 @@ func (s *resolver) deregisterHost(ctx context.Context) error {
// Close implements io.Closer.
func (s *resolver) Close() (err error) {
if !s.closed.CompareAndSwap(false, true) {
return errors.New("component was already closed")
s.wg.Wait()
return nil
}

close(s.closeCh)
s.wg.Wait()

errs := make([]error, 0)

Expand Down

0 comments on commit 0da78fe

Please sign in to comment.