diff --git a/pkg/registry/etcd/ns_server.go b/pkg/registry/etcd/ns_server.go index 0f215b3f..43e176a8 100644 --- a/pkg/registry/etcd/ns_server.go +++ b/pkg/registry/etcd/ns_server.go @@ -162,7 +162,13 @@ func (n *etcdNSRegistryServer) Find(query *registry.NetworkServiceQuery, s regis } } if query.Watch { - if err := n.watch(query, s); err != nil && !errors.Is(err, io.EOF) { + if err := n.watch(query, s); err != nil { + // If we have timed out then return nil to close the stream + // cleanly + if errors.Is(err, io.EOF) || err.Error() == "context canceled" { + logger.Debug("watch timed out, closing stream") + return nil + } return err } } diff --git a/pkg/registry/etcd/nse_server.go b/pkg/registry/etcd/nse_server.go index 52758c90..00d42f53 100644 --- a/pkg/registry/etcd/nse_server.go +++ b/pkg/registry/etcd/nse_server.go @@ -113,7 +113,13 @@ func (n *etcdNSERegistryServer) Find(query *registry.NetworkServiceEndpointQuery } } if query.Watch { - if err := n.watch(query, s); err != nil && !errors.Is(err, io.EOF) { + if err := n.watch(query, s); err != nil { + // If we have timed out then return nil to close the stream + // cleanly + if errors.Is(err, io.EOF) || err.Error() == "context canceled" { + logger.Debug("watch timed out, closing stream") + return nil + } return err } }