Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up usage of channels for signaling #1656

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/nfd-gc/nfd-gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is old NRT ", t, func() {
gc := newMockGC(nil, []string{"node1"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

So(waitForNRT(gc.topoClient), ShouldBeTrue)
Expand All @@ -51,7 +51,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is one old NRT and one up to date", t, func() {
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

So(waitForNRT(gc.topoClient, "node1"), ShouldBeTrue)
Expand All @@ -62,7 +62,7 @@ func TestNRTGC(t *testing.T) {
Convey("Should react to delete event", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

err := gc.k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})
Expand All @@ -81,7 +81,7 @@ func TestNRTGC(t *testing.T) {
},
}

errChan := make(chan error, 1)
errChan := make(chan error)
go func() { errChan <- gc.Run() }()

_, err := gc.topoClient.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{})
Expand All @@ -98,7 +98,7 @@ func newMockGC(nodes, nrts []string) *mockGC {
factory: informers.NewSharedInformerFactory(k8sClient, 5*time.Minute),
nfdClient: fakenfdclientset.NewSimpleClientset(),
topoClient: faketopologyv1alpha2.NewSimpleClientset(createFakeNRTs(nrts...)...),
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
args: &Args{
GCPeriod: 10 * time.Minute,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/nfd-master/nfd-api-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

func newNfdController(config *restclient.Config, nfdApiControllerOptions nfdApiControllerOptions) (*nfdController, error) {
c := &nfdController{
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),

Check warning on line 58 in pkg/nfd-master/nfd-api-controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-api-controller.go#L58

Added line #L58 was not covered by tests
updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/nfd-master/nfd-master-internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newTestNode() *corev1.Node {

func newFakeNfdAPIController(client *fakenfdclient.Clientset) *nfdController {
c := &nfdController{
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string),
}
Expand Down
20 changes: 7 additions & 13 deletions pkg/nfd-master/nfd-master.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
server *grpc.Server
healthServer *grpc.Server
stop chan struct{}
ready chan bool
ready chan struct{}
k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool
deniedNs
Expand All @@ -161,8 +161,8 @@
nfd := &nfdMaster{args: *args,
nodeName: utils.NodeName(),
namespace: utils.GetKubernetesNamespace(),
ready: make(chan bool, 1),
stop: make(chan struct{}, 1),
ready: make(chan struct{}),
stop: make(chan struct{}),
}

if args.Instance != "" {
Expand Down Expand Up @@ -272,7 +272,7 @@
}

// Run gRPC server
grpcErr := make(chan error, 1)
grpcErr := make(chan error)
// If the NodeFeature API is enabled, don'tregister the labeler API
// server. Otherwise, register the labeler server.
if !features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) {
Expand All @@ -296,7 +296,6 @@
}

// Notify that we're ready to accept connections
m.ready <- true
close(m.ready)

// NFD-Master main event loop
Expand Down Expand Up @@ -397,7 +396,7 @@
klog.InfoS("gRPC server serving", "port", m.args.Port)

// Run gRPC server
grpcErr := make(chan error, 1)
grpcErr := make(chan error)
go func() {
defer lis.Close()
grpcErr <- m.server.Serve(lis)
Expand Down Expand Up @@ -475,15 +474,10 @@
// Wait until NfdMaster is able able to accept connections.
func (m *nfdMaster) WaitForReady(timeout time.Duration) bool {
select {
case ready, ok := <-m.ready:
// Ready if the flag is true or the channel has been closed
if ready || !ok {
return true
}
case <-m.ready:
return true

Check warning on line 478 in pkg/nfd-master/nfd-master.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-master/nfd-master.go#L477-L478

Added lines #L477 - L478 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to re write the entire func, the last return will never be used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to re write the entire func, the last return will never be used

True, I modified the func to eliminate unreachable parts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pardon my ignorance, should this select be inside an empty for ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the select blocks until the channel is closed or the timer expoires

case <-time.After(timeout):
return false
}
// We should never end-up here
return false
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
stop: make(chan struct{}, 1),
stop: make(chan struct{}),

Check warning on line 114 in pkg/nfd-topology-updater/nfd-topology-updater.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-topology-updater/nfd-topology-updater.go#L114

Added line #L114 was not covered by tests
nodeName: utils.NodeName(),
eventSource: eventSource,
config: &NFDConfig{},
Expand Down Expand Up @@ -207,7 +207,6 @@
// CAUTION: these resources are expected to change rarely - if ever.
// So we are intentionally do this once during the process lifecycle.
// TODO: Obtain node resources dynamically from the podresource API
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList

excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
Expand All @@ -216,7 +215,7 @@
return fmt.Errorf("failed to obtain node resource information: %w", err)
}

grpcErr := make(chan error, 1)
grpcErr := make(chan error)

Check warning on line 218 in pkg/nfd-topology-updater/nfd-topology-updater.go

View check run for this annotation

Codecov / codecov/patch

pkg/nfd-topology-updater/nfd-topology-updater.go#L218

Added line #L218 was not covered by tests

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewNfdWorker(args *Args) (NfdWorker, error) {
args: *args,
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1),
stop: make(chan struct{}),
}

// Check TLS related args
Expand Down Expand Up @@ -290,7 +290,7 @@ func (w *nfdWorker) Run() error {
return nil
}

grpcErr := make(chan error, 1)
grpcErr := make(chan error)

// Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 {
Expand Down