Skip to content

Commit

Permalink
Tidy up usage of channels for signaling
Browse files Browse the repository at this point in the history
This started as a small effort to simplify the usage of "ready" channel
in nfd-master. It extended into a wider simplification/unification of
the channel usage.
  • Loading branch information
marquiz committed Apr 5, 2024
1 parent 275e625 commit d7c5ce3
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 23 deletions.
10 changes: 5 additions & 5 deletions pkg/nfd-gc/nfd-gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is old NRT ", t, func() {
gc := newMockGC(nil, []string{"node1"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

So(waitForNRT(gc.topoClient), ShouldBeTrue)
Expand All @@ -51,7 +51,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is one old NRT and one up to date", t, func() {
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

So(waitForNRT(gc.topoClient, "node1"), ShouldBeTrue)
Expand All @@ -62,7 +62,7 @@ func TestNRTGC(t *testing.T) {
Convey("Should react to delete event", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

err := gc.k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})
Expand All @@ -81,7 +81,7 @@ func TestNRTGC(t *testing.T) {
},
}

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

_, err := gc.topoClient.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{})
Expand All @@ -98,7 +98,7 @@ func newMockGC(nodes, nrts []string) *mockGC {
factory: informers.NewSharedInformerFactory(k8sClient, 5*time.Minute),
nfdClient: fakenfdclientset.NewSimpleClientset(),
topoClient: faketopologyv1alpha2.NewSimpleClientset(createFakeNRTs(nrts...)...),
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
args: &Args{
GCPeriod: 10 * time.Minute,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/nfd-master/nfd-api-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func init() {

func newNfdController(config *restclient.Config, nfdApiControllerOptions nfdApiControllerOptions) (*nfdController, error) {
c := &nfdController{
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/nfd-master/nfd-master-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newTestNode() *corev1.Node {

func newFakeNfdAPIController(client *fakenfdclient.Clientset) *nfdController {
c := &nfdController{
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string),
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type nfdMaster struct {
server *grpc.Server
healthServer *grpc.Server
stop chan struct{}
ready chan bool
ready chan struct{}
k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool
deniedNs
Expand All @@ -161,8 +161,8 @@ func NewNfdMaster(args *Args) (NfdMaster, error) {
nfd := &nfdMaster{args: *args,
nodeName: utils.NodeName(),
namespace: utils.GetKubernetesNamespace(),
ready: make(chan bool, 1),
stop: make(chan struct{}, 1),
ready: make(chan struct{}),
stop: make(chan struct{}),
}

if args.Instance != "" {
Expand Down Expand Up @@ -272,7 +272,7 @@ func (m *nfdMaster) Run() error {
}

// Run gRPC server
grpcErr := make(chan error, 1)
grpcErr := make(chan error)
// If the NodeFeature API is enabled, don'tregister the labeler API
// server. Otherwise, register the labeler server.
if !features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) {
Expand All @@ -296,7 +296,6 @@ func (m *nfdMaster) Run() error {
}

// Notify that we're ready to accept connections
m.ready <- true
close(m.ready)

// NFD-Master main event loop
Expand Down Expand Up @@ -397,7 +396,7 @@ func (m *nfdMaster) runGrpcServer(errChan chan<- error) {
klog.InfoS("gRPC server serving", "port", m.args.Port)

// Run gRPC server
grpcErr := make(chan error, 1)
grpcErr := make(chan error)
go func() {
defer lis.Close()
grpcErr <- m.server.Serve(lis)
Expand Down Expand Up @@ -475,11 +474,8 @@ func (m *nfdMaster) Stop() {
// Wait until NfdMaster is able able to accept connections.
func (m *nfdMaster) WaitForReady(timeout time.Duration) bool {
select {
case ready, ok := <-m.ready:
// Ready if the flag is true or the channel has been closed
if ready || !ok {
return true
}
case <-m.ready:
return true
case <-time.After(timeout):
return false
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (Nf
nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
stop: make(chan struct{}, 1),
stop: make(chan struct{}),
nodeName: utils.NodeName(),
eventSource: eventSource,
config: &NFDConfig{},
Expand Down Expand Up @@ -207,7 +207,6 @@ func (w *nfdTopologyUpdater) Run() error {
// CAUTION: these resources are expected to change rarely - if ever.
// So we are intentionally do this once during the process lifecycle.
// TODO: Obtain node resources dynamically from the podresource API
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList

excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
Expand All @@ -216,7 +215,7 @@ func (w *nfdTopologyUpdater) Run() error {
return fmt.Errorf("failed to obtain node resource information: %w", err)
}

grpcErr := make(chan error, 1)
grpcErr := make(chan error)

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewNfdWorker(args *Args) (NfdWorker, error) {
args: *args,
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1),
stop: make(chan struct{}),
}

// Check TLS related args
Expand Down Expand Up @@ -290,7 +290,7 @@ func (w *nfdWorker) Run() error {
return nil
}

grpcErr := make(chan error, 1)
grpcErr := make(chan error)

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
Expand Down

0 comments on commit d7c5ce3

Please sign in to comment.