Skip to content

Commit

Permalink
Added /healthz support (#359)
Browse files Browse the repository at this point in the history
* Added /healthz support. Fixed IsHealthy logic. Added tests for both. Added healthz http handler.
  • Loading branch information
banaag authored Oct 23, 2019
1 parent 670d87c commit 2e5f864
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 36 deletions.
8 changes: 7 additions & 1 deletion cmd/amppkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"

"github.com/ampproject/amppackager/packager/certloader"
"github.com/ampproject/amppackager/packager/healthz"
"github.com/ampproject/amppackager/packager/mux"
"github.com/ampproject/amppackager/packager/rtv"
"github.com/ampproject/amppackager/packager/signer"
Expand Down Expand Up @@ -95,6 +96,11 @@ func main() {
}
}

healthz, err := healthz.New(certCache)
if err != nil {
die(errors.Wrap(err, "building healthz"))
}

rtvCache, err := rtv.New()
if err != nil {
die(errors.Wrap(err, "initializing rtv cache"))
Expand Down Expand Up @@ -127,7 +133,7 @@ func main() {
Addr: addr,
// Don't use DefaultServeMux, per
// https://blog.cloudflare.com/exposing-go-on-the-internet/.
Handler: logIntercept{mux.New(certCache, signer, validityMap)},
Handler: logIntercept{mux.New(certCache, signer, validityMap, healthz)},
ReadTimeout: 10 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
// If needing to stream the response, disable WriteTimeout and
Expand Down
4 changes: 2 additions & 2 deletions cmd/gateway_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type gatewayServer struct {
rtvCache *rtv.RTVCache
}

func shouldPackage() bool {
return true
func shouldPackage() error {
return nil
}

func errorToSXGResponse(err error) *pb.SXGResponse {
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,12 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873 h1:nfPFGzJkUDX6uBmpN/pSw7MbOAWegH5QDQuoXFHedLg=
google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
41 changes: 23 additions & 18 deletions packager/certcache/certcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const maxOCSPResponseBytes = 1024 * 1024
const ocspCheckInterval = 1 * time.Hour

type CertHandler interface {
GetLatestCert() (*x509.Certificate)
GetLatestCert() *x509.Certificate
IsHealthy() error
}

type CertCache struct {
Expand Down Expand Up @@ -128,7 +129,7 @@ func (this *CertCache) Init(stop chan struct{}) error {
// For now this just returns the first entry in the certs field in the cache.
// For follow-on changes, we will transform this to a lambda so that anything
// that needs a cert can do the cert refresh logic (if needed) on demand.
func (this *CertCache) GetLatestCert() (*x509.Certificate) {
func (this *CertCache) GetLatestCert() *x509.Certificate {
// TODO(banaag): check if cert is still valid, refresh if not.
return this.certs[0]
}
Expand Down Expand Up @@ -201,31 +202,34 @@ func (this *CertCache) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
// 8. Some idea of what to do when "things go bad".
// What happens when it's been 7 days, no new OCSP response can be obtained,
// and the current response is about to expire?
func (this *CertCache) IsHealthy() bool {
ocsp, _, err := this.readOCSP()
return err != nil || this.isHealthy(ocsp)
func (this *CertCache) IsHealthy() error {
ocsp, _, errorOcsp := this.readOCSP()
if errorOcsp != nil {
return errorOcsp
}
errorHealth := this.isHealthy(ocsp)
if errorHealth != nil {
return errorHealth
}
return nil
}

func (this *CertCache) isHealthy(ocspResp []byte) bool {
func (this *CertCache) isHealthy(ocspResp []byte) error {
if ocspResp == nil {
log.Println("OCSP response not yet fetched.")
return false
return errors.New("OCSP response not yet fetched.")
}
issuer := this.findIssuer()
if issuer == nil {
log.Println("Cannot find issuer certificate in CertFile.")
return false
return errors.New("Cannot find issuer certificate in CertFile.")
}
resp, err := ocsp.ParseResponseForCert(ocspResp, this.certs[0], issuer)
if err != nil {
log.Println("Error parsing OCSP response:", err)
return false
return errors.Wrap(err, "Error parsing OCSP response.")
}
if resp.NextUpdate.Before(time.Now()) {
log.Println("Cached OCSP is stale, NextUpdate:", resp.NextUpdate)
return false
return errors.Errorf("Cached OCSP is stale, NextUpdate: %v", resp.NextUpdate)
}
return true
return nil
}

// Returns the OCSP response and expiry, refreshing if necessary.
Expand All @@ -240,8 +244,9 @@ func (this *CertCache) readOCSP() ([]byte, time.Time, error) {
if len(ocsp) == 0 {
return nil, time.Time{}, errors.New("Missing OCSP response.")
}
if !this.isHealthy(ocsp) {
return nil, time.Time{}, errors.New("OCSP failed health check.")
err = this.isHealthy(ocsp)
if err != nil {
return nil, time.Time{}, errors.Wrap(err, "OCSP failed health check.")
}
this.ocspUpdateAfterMu.Lock()
defer this.ocspUpdateAfterMu.Unlock()
Expand Down Expand Up @@ -434,7 +439,7 @@ func (this *CertCache) fetchOCSP(orig []byte, ocspUpdateAfter *time.Time) []byte
// OCSP duration must be <=7 days, per
// https://wicg.github.io/webpackage/draft-yasskin-httpbis-origin-signed-exchanges-impl.html#cross-origin-trust.
// Serving these responses may cause UAs to reject the SXG.
if resp.NextUpdate.Sub(resp.ThisUpdate) > time.Hour * 24 * 7 {
if resp.NextUpdate.Sub(resp.ThisUpdate) > time.Hour*24*7 {
log.Printf("OCSP nextUpdate %+v too far ahead of thisUpdate %+v\n", resp.NextUpdate, resp.ThisUpdate)
return orig
}
Expand Down
31 changes: 30 additions & 1 deletion packager/certcache/certcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (this *CertCacheSuite) TearDownTest() {
}

func (this *CertCacheSuite) mux() http.Handler {
return mux.New(this.handler, nil, nil)
return mux.New(this.handler, nil, nil, nil)
}

func (this *CertCacheSuite) ocspServerCalled(f func()) bool {
Expand Down Expand Up @@ -181,6 +181,35 @@ func (this *CertCacheSuite) TestServesCertificate() {
this.Assert().NotContains(cbor, "sct")
}

func (this *CertCacheSuite) TestCertCacheIsHealthy() {
this.Assert().NoError(this.handler.IsHealthy())
}

func (this *CertCacheSuite) TestCertCacheIsNotHealthy() {
// Prime memory cache with a past-midpoint OCSP:
err := os.Remove(filepath.Join(this.tempDir, "ocsp"))
this.Require().NoError(err, "deleting OCSP tempfile")
this.fakeOCSP, err = FakeOCSPResponse(time.Now().Add(-4 * 24 * time.Hour))
this.Require().NoError(err, "creating stale OCSP response")
this.Require().True(this.ocspServerCalled(func() {
this.handler, err = this.New()
this.Require().NoError(err, "reinstantiating CertCache")
}))

// Prime disk cache with a bad OCSP:
freshOCSP := []byte("0xdeadbeef")
this.fakeOCSP = freshOCSP
err = ioutil.WriteFile(filepath.Join(this.tempDir, "ocsp"), freshOCSP, 0644)
this.Require().NoError(err, "writing fresh OCSP response to disk")

// On update, verify network is not called (fresh OCSP from disk is used):
this.Assert().True(this.ocspServerCalled(func() {
this.handler.readOCSP()
}))

this.Assert().Error(this.handler.IsHealthy())
}

func (this *CertCacheSuite) TestServes404OnMissingCertificate() {
resp := pkgt.Get(this.T(), this.mux(), "/amppkg/cert/lalala")
this.Assert().Equal(http.StatusNotFound, resp.StatusCode, "incorrect status: %#v", resp)
Expand Down
2 changes: 1 addition & 1 deletion packager/certcache/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"runtime"
"sync"

"github.com/pkg/errors"
"github.com/gofrs/flock"
"github.com/pkg/errors"
)

// This is an abstraction over a single file on a remote storage mechanism. It
Expand Down
41 changes: 41 additions & 0 deletions packager/healthz/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package healthz

import (
"fmt"
"github.com/ampproject/amppackager/packager/certcache"
"net/http"
)

type Healthz struct {
certHandler certcache.CertHandler
}

func New(certHandler certcache.CertHandler) (*Healthz, error) {
return &Healthz{certHandler}, nil
}

func (this *Healthz) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
// Follow https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
err := this.certHandler.IsHealthy()
if err != nil {
resp.WriteHeader(500)
resp.Write([]byte(fmt.Sprintf("not healthy: %v", err)))
} else {
resp.WriteHeader(200)
resp.Write([]byte("ok"))
}
}
64 changes: 64 additions & 0 deletions packager/healthz/healthz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package healthz

import (
"crypto/x509"
"net/http"
"testing"

"github.com/ampproject/amppackager/packager/mux"
"github.com/pkg/errors"

pkgt "github.com/ampproject/amppackager/packager/testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type fakeHealthyCertHandler struct {
}

func (this fakeHealthyCertHandler) GetLatestCert() *x509.Certificate {
return pkgt.Certs[0]
}

func (this fakeHealthyCertHandler) IsHealthy() error {
return nil
}

type fakeNotHealthyCertHandler struct {
}

func (this fakeNotHealthyCertHandler) GetLatestCert() *x509.Certificate {
return pkgt.Certs[0]
}

func (this fakeNotHealthyCertHandler) IsHealthy() error {
return errors.New("random error")
}

func TestHealthzOk(t *testing.T) {
handler, err := New(fakeHealthyCertHandler{})
require.NoError(t, err)
resp := pkgt.Get(t, mux.New(nil, nil, nil, handler), "/healthz")
assert.Equal(t, http.StatusOK, resp.StatusCode, "ok", resp)
}

func TestHealthzFail(t *testing.T) {
handler, err := New(fakeNotHealthyCertHandler{})
require.NoError(t, err)
resp := pkgt.Get(t, mux.New(nil, nil, nil, handler), "/healthz")
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode, "error", resp)
}
7 changes: 5 additions & 2 deletions packager/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type mux struct {
certCache http.Handler
signer http.Handler
validityMap http.Handler
healthz http.Handler
}

// The main entry point. Use the return value for http.Server.Handler.
func New(certCache http.Handler, signer http.Handler, validityMap http.Handler) http.Handler {
return &mux{certCache, signer, validityMap}
func New(certCache http.Handler, signer http.Handler, validityMap http.Handler, healthz http.Handler) http.Handler {
return &mux{certCache, signer, validityMap, healthz}
}

func tryTrimPrefix(s, prefix string) (string, bool) {
Expand Down Expand Up @@ -97,6 +98,8 @@ func (this *mux) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
params["certName"] = unescaped
this.certCache.ServeHTTP(resp, req)
}
} else if path == util.HealthzPath {
this.healthz.ServeHTTP(resp, req)
} else if path == util.ValidityMapPath {
this.validityMap.ServeHTTP(resp, req)
} else {
Expand Down
10 changes: 5 additions & 5 deletions packager/signer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Signer struct {
client *http.Client
urlSets []util.URLSet
rtvCache *rtv.RTVCache
shouldPackage func() bool
shouldPackage func() error
overrideBaseURL *url.URL
requireHeaders bool
forwardedRequestHeaders []string
Expand All @@ -139,7 +139,7 @@ func noRedirects(req *http.Request, via []*http.Request) error {
}

func New(certHandler certcache.CertHandler, key crypto.PrivateKey, urlSets []util.URLSet,
rtvCache *rtv.RTVCache, shouldPackage func() bool, overrideBaseURL *url.URL,
rtvCache *rtv.RTVCache, shouldPackage func() error, overrideBaseURL *url.URL,
requireHeaders bool, forwardedRequestHeaders []string) (*Signer, error) {
client := http.Client{
CheckRedirect: noRedirects,
Expand Down Expand Up @@ -307,8 +307,8 @@ func (this *Signer) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
}()

if !this.shouldPackage() {
log.Println("Not packaging because server is unhealthy; see above log statements.")
if err := this.shouldPackage(); err != nil {
log.Println("Not packaging because server is unhealthy; see above log statements.", err)
proxy(resp, fetchResp, nil)
return
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func (this *Signer) serveSignedExchange(resp http.ResponseWriter, fetchResp *htt
fetchResp.Header.Get("Content-Security-Policy")))

exchange := signedexchange.NewExchange(
accept.SxgVersion, /*uri=*/signURL.String(), /*method=*/"GET",
accept.SxgVersion /*uri=*/, signURL.String() /*method=*/, "GET",
http.Header{}, fetchResp.StatusCode, fetchResp.Header, []byte(transformed))
if err := exchange.MiEncodePayload(miRecordSize); err != nil {
util.NewHTTPError(http.StatusInternalServerError, "Error MI-encoding: ", err).LogAndRespond(resp)
Expand Down
Loading

0 comments on commit 2e5f864

Please sign in to comment.