Skip to content

Commit

Permalink
feat(dhcp): Cancel backoff retry on stop
Browse files Browse the repository at this point in the history
Signed-off-by: Songmin Li <[email protected]>
  • Loading branch information
lisongmin authored and squeed committed Oct 14, 2024
1 parent d61e7e5 commit a4fc6f9
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 51 deletions.
29 changes: 16 additions & 13 deletions plugins/ipam/dhcp/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ import (
var errNoMoreTries = errors.New("no more tries")

type DHCP struct {
mux sync.Mutex
leases map[string]*DHCPLease
hostNetnsPrefix string
clientTimeout time.Duration
clientResendMax time.Duration
broadcast bool
mux sync.Mutex
leases map[string]*DHCPLease
hostNetnsPrefix string
clientTimeout time.Duration
clientResendMax time.Duration
clientResendTimeout time.Duration
broadcast bool
}

func newDHCP(clientTimeout, clientResendMax time.Duration) *DHCP {
func newDHCP(clientTimeout, clientResendMax time.Duration, resendTimeout time.Duration) *DHCP {
return &DHCP{
leases: make(map[string]*DHCPLease),
clientTimeout: clientTimeout,
clientResendMax: clientResendMax,
leases: make(map[string]*DHCPLease),
clientTimeout: clientTimeout,
clientResendMax: clientResendMax,
clientResendTimeout: resendTimeout,
}
}

Expand Down Expand Up @@ -90,7 +92,7 @@ func (d *DHCP) Allocate(args *skel.CmdArgs, result *current.Result) error {
hostNetns := d.hostNetnsPrefix + args.Netns
l, err = AcquireLease(clientID, hostNetns, args.IfName,
opts,
d.clientTimeout, d.clientResendMax, d.broadcast)
d.clientTimeout, d.clientResendMax, d.clientResendTimeout, d.broadcast)
if err != nil {
return err
}
Expand Down Expand Up @@ -190,7 +192,8 @@ func getListener(socketPath string) (net.Listener, error) {

func runDaemon(
pidfilePath, hostPrefix, socketPath string,
dhcpClientTimeout time.Duration, resendMax time.Duration, broadcast bool,
dhcpClientTimeout time.Duration, resendMax time.Duration, resendTimeout time.Duration,
broadcast bool,
) error {
// since other goroutines (on separate threads) will change namespaces,
// ensure the RPC server does not get scheduled onto those
Expand Down Expand Up @@ -225,7 +228,7 @@ func runDaemon(
done <- true
}()

dhcp := newDHCP(dhcpClientTimeout, resendMax)
dhcp := newDHCP(dhcpClientTimeout, resendMax, resendTimeout)
dhcp.hostNetnsPrefix = hostPrefix
dhcp.broadcast = broadcast
rpc.Register(dhcp)
Expand Down
2 changes: 1 addition & 1 deletion plugins/ipam/dhcp/dhcp2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var _ = Describe("DHCP Multiple Lease Operations", func() {
// Start the DHCP client daemon
dhcpPluginPath, err := exec.LookPath("dhcp")
Expect(err).NotTo(HaveOccurred())
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath)
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "--timeout", "2s", "--resendtimeout", "8s")
err = clientCmd.Start()
Expect(err).NotTo(HaveOccurred())
Expect(clientCmd.Process).NotTo(BeNil())
Expand Down
23 changes: 16 additions & 7 deletions plugins/ipam/dhcp/dhcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,25 @@ func getTmpDir() (string, error) {
}

type DhcpServer struct {
cmd *exec.Cmd
cmd *exec.Cmd
lock sync.Mutex

startAddr net.IP
endAddr net.IP
leaseTime time.Duration
}

func (s *DhcpServer) Serve() error {
if err := s.Start(); err != nil {
return err
}
return s.cmd.Wait()
}

func (s *DhcpServer) Start() error {
s.lock.Lock()
defer s.lock.Unlock()

s.cmd = exec.Command(
"dnsmasq",
"--no-daemon",
Expand All @@ -69,11 +80,9 @@ func (s *DhcpServer) Serve() error {
}

func (s *DhcpServer) Stop() error {
if err := s.cmd.Process.Kill(); err != nil {
return err
}
_, err := s.cmd.Process.Wait()
return err
s.lock.Lock()
defer s.lock.Unlock()
return s.cmd.Process.Kill()
}

func dhcpServerStart(netns ns.NetNS, numLeases int, stopCh <-chan bool) *sync.WaitGroup {
Expand Down Expand Up @@ -535,7 +544,7 @@ var _ = Describe("DHCP Lease Unavailable Operations", func() {
// `go test` timeout with default delays. Since our DHCP server
// and client daemon are local processes anyway, we can depend on
// them to respond very quickly.
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "-timeout", "2s", "-resendmax", "8s")
clientCmd = exec.Command(dhcpPluginPath, "daemon", "-socketpath", socketPath, "-timeout", "2s", "-resendmax", "8s", "--resendtimeout", "10s")

// copy dhcp client's stdout/stderr to test stdout
var b bytes.Buffer
Expand Down
61 changes: 34 additions & 27 deletions plugins/ipam/dhcp/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import (
// RFC 2131 suggests using exponential backoff, starting with 4sec
// and randomized to +/- 1sec
const (
resendDelay0 = 4 * time.Second
resendDelayMax = 62 * time.Second
defaultLeaseTime = 60 * time.Minute
resendDelay0 = 4 * time.Second
resendDelayMax = 62 * time.Second
defaultLeaseTime = 60 * time.Minute
defaultResendTimeout = 208 * time.Second // fast resend + backoff resend
)

// To speed up the retry for first few failures, we retry without
Expand Down Expand Up @@ -69,6 +70,7 @@ type DHCPLease struct {
expireTime time.Time
timeout time.Duration
resendMax time.Duration
resendTimeout time.Duration
broadcast bool
stopping uint32
stop chan struct{}
Expand Down Expand Up @@ -155,23 +157,24 @@ func prepareOptions(cniArgs string, provideOptions []ProvideOption, requestOptio
func AcquireLease(
clientID, netns, ifName string,
opts []dhcp4.Option,
timeout, resendMax time.Duration, broadcast bool,
timeout, resendMax time.Duration, resendTimeout time.Duration, broadcast bool,
) (*DHCPLease, error) {
errCh := make(chan error, 1)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

l := &DHCPLease{
clientID: clientID,
stop: make(chan struct{}),
check: make(chan struct{}),
timeout: timeout,
resendMax: resendMax,
broadcast: broadcast,
opts: opts,
cancelFunc: cancel,
ctx: ctx,
clientID: clientID,
stop: make(chan struct{}),
check: make(chan struct{}),
timeout: timeout,
resendMax: resendMax,
resendTimeout: resendTimeout,
broadcast: broadcast,
opts: opts,
cancelFunc: cancel,
ctx: ctx,
}

log.Printf("%v: acquiring lease", clientID)
Expand Down Expand Up @@ -213,6 +216,7 @@ func AcquireLease(
func (l *DHCPLease) Stop() {
if atomic.CompareAndSwapUint32(&l.stopping, 0, 1) {
close(l.stop)
l.cancelFunc()
}
l.wg.Wait()
}
Expand Down Expand Up @@ -251,9 +255,11 @@ func (l *DHCPLease) acquire() error {
}
defer c.Close()

pkt, err := backoffRetry(l.resendMax, func() (*nclient4.Lease, error) {
timeoutCtx, cancel := context.WithTimeoutCause(l.ctx, l.resendTimeout, errNoMoreTries)
defer cancel()
pkt, err := backoffRetry(timeoutCtx, l.resendMax, func() (*nclient4.Lease, error) {
return c.Request(
l.ctx,
timeoutCtx,
withClientID(l.clientID),
withAllOptions(l),
)
Expand Down Expand Up @@ -351,9 +357,11 @@ func (l *DHCPLease) renew() error {
}
defer c.Close()

lease, err := backoffRetry(l.resendMax, func() (*nclient4.Lease, error) {
timeoutCtx, cancel := context.WithTimeoutCause(l.ctx, l.resendTimeout, errNoMoreTries)
defer cancel()
lease, err := backoffRetry(timeoutCtx, l.resendMax, func() (*nclient4.Lease, error) {
return c.Renew(
l.ctx,
timeoutCtx,
l.latestLease,
withClientID(l.clientID),
withAllOptions(l),
Expand Down Expand Up @@ -441,7 +449,7 @@ func jitter(span time.Duration) time.Duration {
return time.Duration(float64(span) * (2.0*rand.Float64() - 1.0))
}

func backoffRetry(resendMax time.Duration, f func() (*nclient4.Lease, error)) (*nclient4.Lease, error) {
func backoffRetry(ctx context.Context, resendMax time.Duration, f func() (*nclient4.Lease, error)) (*nclient4.Lease, error) {
baseDelay := resendDelay0
var sleepTime time.Duration
fastRetryLimit := resendFastMax
Expand All @@ -462,17 +470,16 @@ func backoffRetry(resendMax time.Duration, f func() (*nclient4.Lease, error)) (*

log.Printf("retrying in %f seconds", sleepTime.Seconds())

time.Sleep(sleepTime)

// only adjust delay time if we are in normal backoff stage
if baseDelay < resendMax && fastRetryLimit == 0 {
baseDelay *= 2
} else if fastRetryLimit == 0 { // only break if we are at normal delay
break
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
case <-time.After(sleepTime):
// only adjust delay time if we are in normal backoff stage
if baseDelay < resendMax && fastRetryLimit == 0 {
baseDelay *= 2
}
}
}

return nil, errNoMoreTries
}

func newDHCPClient(
Expand Down
8 changes: 5 additions & 3 deletions plugins/ipam/dhcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,22 @@ func main() {
var broadcast bool
var timeout time.Duration
var resendMax time.Duration
var resendTimeout time.Duration
daemonFlags := flag.NewFlagSet("daemon", flag.ExitOnError)
daemonFlags.StringVar(&pidfilePath, "pidfile", "", "optional path to write daemon PID to")
daemonFlags.StringVar(&hostPrefix, "hostprefix", "", "optional prefix to host root")
daemonFlags.StringVar(&socketPath, "socketpath", "", "optional dhcp server socketpath")
daemonFlags.BoolVar(&broadcast, "broadcast", false, "broadcast DHCP leases")
daemonFlags.DurationVar(&timeout, "timeout", 10*time.Second, "optional dhcp client timeout duration")
daemonFlags.DurationVar(&resendMax, "resendmax", resendDelayMax, "optional dhcp client resend max duration")
daemonFlags.DurationVar(&timeout, "timeout", 10*time.Second, "optional dhcp client timeout duration for each request")
daemonFlags.DurationVar(&resendMax, "resendmax", resendDelayMax, "optional dhcp client max resend delay between requests")
daemonFlags.DurationVar(&resendTimeout, "resendtimeout", defaultResendTimeout, "optional dhcp client resend timeout, no more retries after this timeout")
daemonFlags.Parse(os.Args[2:])

if socketPath == "" {
socketPath = defaultSocketPath
}

if err := runDaemon(pidfilePath, hostPrefix, socketPath, timeout, resendMax, broadcast); err != nil {
if err := runDaemon(pidfilePath, hostPrefix, socketPath, timeout, resendMax, resendTimeout, broadcast); err != nil {
log.Print(err.Error())
os.Exit(1)
}
Expand Down

0 comments on commit a4fc6f9

Please sign in to comment.