Skip to content

Commit

Permalink
feat: add tracing (#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik authored Apr 7, 2021
1 parent 7bc9412 commit b57a144
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 35 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ require (
github.com/go-swagger/go-swagger v0.26.1
github.com/gobuffalo/pop/v5 v5.3.3
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.1
github.com/gorilla/websocket v1.4.2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/julienschmidt/httprouter v1.3.0
github.com/luna-duclos/instrumentedsql v1.1.3
github.com/luna-duclos/instrumentedsql/opentracing v0.0.0-20201103091713-40d03108b6f4
github.com/mattn/goveralls v0.0.8
github.com/ory/cli v0.0.48
github.com/ory/go-acc v0.2.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/api v1.5.0/go.mod h1:LqwrLNW876eYSuUOo4ZLHBcdKc038txr/IMfbLPATa4=
Expand Down
79 changes: 53 additions & 26 deletions internal/driver/registry_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
acl "github.com/ory/keto/proto/ory/keto/acl/v1alpha1"

grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/ory/x/reqlog"
"github.com/urfave/negroni"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -53,6 +54,7 @@ type (
healthH *healthx.Handler
healthServer *health.Server
handlers []Handler
tracer *tracing.Tracer
}
Handler interface {
RegisterReadRoutes(r *x.ReadRouter)
Expand Down Expand Up @@ -99,7 +101,16 @@ func (r *RegistryDefault) GetVersion(_ context.Context, _ *acl.GetVersionRequest
}

func (r *RegistryDefault) Tracer() *tracing.Tracer {
panic("implement me")
if r.tracer == nil {
// Tracing is initialized only once so it can not be hot reloaded or context-aware.
t, err := tracing.New(r.Logger(), r.Config().TracingConfig())
if err != nil {
r.Logger().WithError(err).Fatalf("Unable to initialize Tracer.")
}
r.tracer = t
}

return r.tracer
}

func (r *RegistryDefault) Logger() *logrusx.Logger {
Expand Down Expand Up @@ -152,7 +163,7 @@ func (r *RegistryDefault) Init(ctx context.Context) error {
return err
}

r.p, err = sql.NewPersister(r.c.DSN(), r.Logger(), nm)
r.p, err = sql.NewPersister(r.c.DSN(), r.Logger(), nm, r.Tracer())
if err != nil {
return err
}
Expand Down Expand Up @@ -219,6 +230,11 @@ func (r *RegistryDefault) ReadRouter() http.Handler {
}

n.UseHandler(br)

if t := r.Tracer(); t.IsLoaded() {
n.Use(t)
}

return n
}

Expand All @@ -234,23 +250,44 @@ func (r *RegistryDefault) WriteRouter() http.Handler {
}

n.UseHandler(pr)

if t := r.Tracer(); t.IsLoaded() {
n.Use(t)
}

return n
}

func (r *RegistryDefault) ReadGRPCServer() *grpc.Server {
s := grpc.NewServer(
grpc.ChainStreamInterceptor(
herodot.StreamErrorUnwrapInterceptor,
grpcMiddleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(r.l.Entry),
),
func (r *RegistryDefault) unaryInterceptors() []grpc.UnaryServerInterceptor {
is := []grpc.UnaryServerInterceptor{
herodot.UnaryErrorUnwrapInterceptor,
grpcMiddleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(r.l.Entry),
),
grpc.ChainUnaryInterceptor(
herodot.UnaryErrorUnwrapInterceptor,
grpcMiddleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(r.l.Entry),
),
}
if r.Tracer().IsLoaded() {
is = append(is, otgrpc.OpenTracingServerInterceptor(r.Tracer().Tracer()))
}
return is
}

func (r *RegistryDefault) streamInterceptors() []grpc.StreamServerInterceptor {
is := []grpc.StreamServerInterceptor{
herodot.StreamErrorUnwrapInterceptor,
grpcMiddleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(r.l.Entry),
),
}
if r.Tracer().IsLoaded() {
is = append(is, otgrpc.OpenTracingStreamServerInterceptor(r.Tracer().Tracer()))
}
return is
}

func (r *RegistryDefault) ReadGRPCServer() *grpc.Server {
s := grpc.NewServer(
grpc.ChainStreamInterceptor(r.streamInterceptors()...),
grpc.ChainUnaryInterceptor(r.unaryInterceptors()...),
)

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())
Expand All @@ -266,18 +303,8 @@ func (r *RegistryDefault) ReadGRPCServer() *grpc.Server {

func (r *RegistryDefault) WriteGRPCServer() *grpc.Server {
s := grpc.NewServer(
grpc.ChainStreamInterceptor(
herodot.StreamErrorUnwrapInterceptor,
grpcMiddleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(r.l.Entry),
),
),
grpc.ChainUnaryInterceptor(
herodot.UnaryErrorUnwrapInterceptor,
grpcMiddleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(r.l.Entry),
),
),
grpc.ChainStreamInterceptor(r.streamInterceptors()...),
grpc.ChainUnaryInterceptor(r.unaryInterceptors()...),
)

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/sql/full_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestPersister(t *testing.T) {
hook = &test.Hook{}
lx := logrusx.New("", "", logrusx.WithHook(hook), logrusx.ForceLevel(logrus.TraceLevel))

p, err := NewPersister(dsn.Conn, lx, config.NewMemoryNamespaceManager())
p, err := NewPersister(dsn.Conn, lx, config.NewMemoryNamespaceManager(), nil)
require.NoError(t, err)

mb, err := p.MigrationBox(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/sql/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestNamespaceMigrations(t *testing.T) {
l := logrusx.New("", "", logrusx.ForceLevel(logrus.DebugLevel), logrusx.WithHook(&hook))
nm := config.NewMemoryNamespaceManager(nn...)

p, err := NewPersister(dsn.Conn, l, nm)
p, err := NewPersister(dsn.Conn, l, nm, nil)
require.NoError(t, err)
return p, &hook
}
Expand Down
27 changes: 21 additions & 6 deletions internal/persistence/sql/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"strconv"
"time"

"github.com/luna-duclos/instrumentedsql"
"github.com/luna-duclos/instrumentedsql/opentracing"
"github.com/ory/x/tracing"

"github.com/ory/x/popx"

"github.com/cenkalti/backoff/v3"
Expand All @@ -29,6 +33,7 @@ type (
namespaces namespace.Manager
l *logrusx.Logger
dsn string
tracer *tracing.Tracer
}
internalPagination struct {
Page, PerPage int
Expand All @@ -51,13 +56,14 @@ var (
_ persistence.Persister = &Persister{}
)

func NewPersister(dsn string, l *logrusx.Logger, namespaces namespace.Manager) (*Persister, error) {
func NewPersister(dsn string, l *logrusx.Logger, namespaces namespace.Manager, tracer *tracing.Tracer) (*Persister, error) {
pop.SetLogger(l.PopLogger)

p := &Persister{
namespaces: namespaces,
l: l,
dsn: dsn,
tracer: tracer,
}

var err error
Expand All @@ -70,13 +76,22 @@ func NewPersister(dsn string, l *logrusx.Logger, namespaces namespace.Manager) (
}

func (p *Persister) newConnection(options map[string]string) (c *pop.Connection, err error) {
var opts []instrumentedsql.Opt
if p.tracer.IsLoaded() {
opts = []instrumentedsql.Opt{
instrumentedsql.WithTracer(opentracing.NewTracer(true)),
instrumentedsql.WithOmitArgs(),
}
}
pool, idlePool, connMaxLifetime, cleanedDSN := sqlcon.ParseConnectionOptions(p.l, p.dsn)
connDetails := &pop.ConnectionDetails{
URL: sqlcon.FinalizeDSN(p.l, cleanedDSN),
IdlePool: idlePool,
ConnMaxLifetime: connMaxLifetime,
Pool: pool,
Options: options,
URL: sqlcon.FinalizeDSN(p.l, cleanedDSN),
IdlePool: idlePool,
ConnMaxLifetime: connMaxLifetime,
Pool: pool,
Options: options,
UseInstrumentedDriver: p.tracer != nil && p.tracer.IsLoaded(),
InstrumentedDriverOptions: opts,
}

bc := backoff.NewExponentialBackOff()
Expand Down

0 comments on commit b57a144

Please sign in to comment.