-
Notifications
You must be signed in to change notification settings - Fork 437
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* #890 health check implementation (fully working, but lacks tests) * #890 added tests for client health check implementation * #890 defer Unlock instead of calling manually * go.sum reverted to the original state * #890 switches in main.go reworked to ServeMux, backoff implementation replaced with existing module, health check function simplified * Update mod files * #890 client health check simplified and refactored * #890 test error handling, necessary comments * #890 health service initialization reqorked; minor fixes in tests * #890 test error handling fixed * #890 test timeout handling reworked * Update go/grpcweb/health_test.go Co-authored-by: Johan Brandhorst-Satzkorn <[email protected]> * Update go/grpcweb/health_test.go Co-authored-by: Johan Brandhorst-Satzkorn <[email protected]> * #890 docs regenerated * #890 proper repo name Co-authored-by: Evgeny Mikerin <[email protected]> Co-authored-by: Johan Brandhorst-Satzkorn <[email protected]>
- Loading branch information
1 parent
ccbe285
commit 502cb1e
Showing
6 changed files
with
312 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package grpcweb | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
backoff "github.com/cenkalti/backoff/v4" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
healthpb "google.golang.org/grpc/health/grpc_health_v1" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
const healthCheckMethod = "/grpc.health.v1.Health/Watch" | ||
|
||
// Client health check function is also part of the grpc/internal package | ||
// The following code is a simplified version of client.go | ||
// For more details see: https://pkg.go.dev/google.golang.org/grpc/health | ||
func ClientHealthCheck(ctx context.Context, backendConn *grpc.ClientConn, service string, setServingStatus func(serving bool)) error { | ||
shouldBackoff := false // No need for backoff on the first connection attempt | ||
backoffSrc := backoff.NewExponentialBackOff() | ||
healthClient := healthpb.NewHealthClient(backendConn) | ||
|
||
for { | ||
// Backs off if the connection has failed in some way without receiving a message in the previous retry. | ||
if shouldBackoff { | ||
select { | ||
case <-time.After(backoffSrc.NextBackOff()): | ||
case <-ctx.Done(): | ||
return nil | ||
} | ||
} | ||
shouldBackoff = true // we should backoff next time, since we attempt connecting below | ||
|
||
req := healthpb.HealthCheckRequest{Service: service} | ||
s, err := healthClient.Watch(ctx, &req) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
resp := new(healthpb.HealthCheckResponse) | ||
for { | ||
err = s.RecvMsg(resp) | ||
if err != nil { | ||
setServingStatus(false) | ||
// The health check functionality should be disabled if health check service is not implemented on the backend | ||
if status.Code(err) == codes.Unimplemented { | ||
return err | ||
} | ||
// breaking out of the loop, since we got an error from Recv, triggering reconnect | ||
break | ||
} | ||
|
||
// As a message has been received, removes the need for backoff for the next retry. | ||
shouldBackoff = false | ||
backoffSrc.Reset() | ||
|
||
if resp.Status == healthpb.HealthCheckResponse_SERVING { | ||
setServingStatus(true) | ||
} else { | ||
setServingStatus(false) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
package grpcweb_test | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"testing" | ||
"time" | ||
|
||
"github.com/improbable-eng/grpc-web/go/grpcweb" | ||
testproto "github.com/improbable-eng/grpc-web/integration_test/go/_proto/improbable/grpcweb/test" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/health" | ||
healthpb "google.golang.org/grpc/health/grpc_health_v1" | ||
) | ||
|
||
func TestClientWithNoHealthServiceOnServer(t *testing.T) { | ||
// Set up and run a server with no health check handler registered | ||
grpcServer := grpc.NewServer() | ||
testproto.RegisterTestServiceServer(grpcServer, &testServiceImpl{}) | ||
listener, err := net.Listen("tcp", "127.0.0.1:0") | ||
require.NoError(t, err) | ||
|
||
go func() { | ||
_ = grpcServer.Serve(listener) | ||
}() | ||
t.Cleanup(grpcServer.Stop) | ||
|
||
grpcClientConn, err := grpc.Dial(listener.Addr().String(), | ||
grpc.WithBlock(), | ||
grpc.WithTimeout(100*time.Millisecond), | ||
grpc.WithInsecure(), | ||
) | ||
require.NoError(t, err) | ||
|
||
ctx := context.Background() | ||
|
||
servingStatus := true | ||
err = grpcweb.ClientHealthCheck(ctx, grpcClientConn, "", func(serving bool) { | ||
servingStatus = serving | ||
}) | ||
assert.Error(t, err) | ||
assert.False(t, servingStatus) | ||
} | ||
|
||
type clientHealthTestData struct { | ||
listener net.Listener | ||
serving bool | ||
healthServer *health.Server | ||
} | ||
|
||
func setupTestData(t *testing.T) clientHealthTestData { | ||
s := clientHealthTestData{} | ||
|
||
grpcServer := grpc.NewServer() | ||
s.healthServer = health.NewServer() | ||
healthpb.RegisterHealthServer(grpcServer, s.healthServer) | ||
|
||
var err error | ||
s.listener, err = net.Listen("tcp", "127.0.0.1:0") | ||
require.NoError(t, err) | ||
|
||
go func() { | ||
grpcServer.Serve(s.listener) | ||
}() | ||
t.Cleanup(grpcServer.Stop) | ||
|
||
return s | ||
} | ||
|
||
func (s *clientHealthTestData) dialBackend(t *testing.T) *grpc.ClientConn { | ||
grpcClientConn, err := grpc.Dial(s.listener.Addr().String(), | ||
grpc.WithBlock(), | ||
grpc.WithTimeout(100*time.Millisecond), | ||
grpc.WithInsecure(), | ||
) | ||
require.NoError(t, err) | ||
return grpcClientConn | ||
} | ||
|
||
func (s *clientHealthTestData) checkServingStatus(t *testing.T, expStatus bool) { | ||
for start := time.Now(); time.Since(start) < 100*time.Millisecond; { | ||
if s.serving == expStatus { | ||
break | ||
} | ||
} | ||
assert.Equal(t, expStatus, s.serving) | ||
} | ||
|
||
func (s *clientHealthTestData) startClientHealthCheck(ctx context.Context, conn *grpc.ClientConn) { | ||
go func() { | ||
_ = grpcweb.ClientHealthCheck(ctx, conn, "", func(status bool) { | ||
s.serving = status | ||
}) | ||
}() | ||
} | ||
|
||
func TestClientHealthCheck_FailsIfNotServing(t *testing.T) { | ||
s := setupTestData(t) | ||
|
||
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) | ||
|
||
backendConn := s.dialBackend(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
s.startClientHealthCheck(ctx, backendConn) | ||
s.checkServingStatus(t, false) | ||
} | ||
|
||
func TestClientHealthCheck_SucceedsIfServing(t *testing.T) { | ||
s := setupTestData(t) | ||
|
||
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) | ||
|
||
backendConn := s.dialBackend(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
s.startClientHealthCheck(ctx, backendConn) | ||
s.checkServingStatus(t, true) | ||
} | ||
|
||
func TestClientHealthCheck_ReactsToStatusChange(t *testing.T) { | ||
s := setupTestData(t) | ||
|
||
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) | ||
|
||
backendConn := s.dialBackend(t) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
s.startClientHealthCheck(ctx, backendConn) | ||
s.checkServingStatus(t, false) | ||
|
||
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) | ||
s.checkServingStatus(t, true) | ||
|
||
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) | ||
s.checkServingStatus(t, false) | ||
} |
Oops, something went wrong.