Skip to content

Commit

Permalink
Merge pull request #144 from k1LoW/runbook-capturer
Browse files Browse the repository at this point in the history
Introduce built-in capturer `capture.Runbook`
  • Loading branch information
k1LoW authored Sep 16, 2022
2 parents a1f3e39 + dea44c2 commit 1761f34
Show file tree
Hide file tree
Showing 27 changed files with 1,123 additions and 84 deletions.
567 changes: 567 additions & 0 deletions capture/runbook.go

Large diffs are not rendered by default.

108 changes: 108 additions & 0 deletions capture/runbook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package capture

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/k1LoW/runn"
"github.com/k1LoW/runn/testutil"
"github.com/tenntenn/golden"
)

func TestRunbook(t *testing.T) {
tests := []struct {
book string
}{
{filepath.Join(testutil.Testdata(), "book", "http.yml")},
{filepath.Join(testutil.Testdata(), "book", "grpc.yml")},
{filepath.Join(testutil.Testdata(), "book", "db.yml")},
{filepath.Join(testutil.Testdata(), "book", "exec.yml")},
}
ctx := context.Background()
for _, tt := range tests {
t.Run(filepath.Base(tt.book), func(t *testing.T) {
dir := t.TempDir()
hs := testutil.HTTPServer(t)
gs := testutil.GRPCServer(t, false)
db, _ := testutil.SQLite(t)
opts := []runn.Option{
runn.Book(tt.book),
runn.HTTPRunner("req", hs.URL, hs.Client()),
runn.GrpcRunner("greq", gs.Conn()),
runn.DBRunner("db", db),
runn.Capture(Runbook(dir)),
}
o, err := runn.New(opts...)
if err != nil {
t.Fatal(err)
}
if err := o.Run(ctx); err != nil {
t.Error(err)
}

got := golden.Txtar(t, dir)
f := fmt.Sprintf("%s.runbook", filepath.Base(tt.book))
if os.Getenv("UPDATE_GOLDEN") != "" {
golden.Update(t, testutil.Testdata(), f, got)
return
}

if diff := golden.Diff(t, testutil.Testdata(), f, got); diff != "" {
t.Error(diff)
}
})
}
}

func TestRunnable(t *testing.T) {
tests := []struct {
book string
}{
{filepath.Join(testutil.Testdata(), "book", "http.yml")},
{filepath.Join(testutil.Testdata(), "book", "grpc.yml")},
{filepath.Join(testutil.Testdata(), "book", "db.yml")},
{filepath.Join(testutil.Testdata(), "book", "exec.yml")},
}
ctx := context.Background()
for _, tt := range tests {
t.Run(filepath.Base(tt.book), func(t *testing.T) {
dir := t.TempDir()
hs := testutil.HTTPServer(t)
gs := testutil.GRPCServer(t, false)
db, _ := testutil.SQLite(t)
opts := []runn.Option{
runn.Book(tt.book),
runn.HTTPRunner("req", hs.URL, hs.Client()),
runn.GrpcRunner("greq", gs.Conn()),
runn.DBRunner("db", db),
runn.Capture(Runbook(dir)),
}
o, err := runn.New(opts...)
if err != nil {
t.Fatal(err)
}
if err := o.Run(ctx); err != nil {
t.Error(err)
}

{
opts := []runn.Option{
runn.Book(filepath.Join(dir, capturedFilename(tt.book))),
runn.HTTPRunner("req", hs.URL, hs.Client()),
runn.GrpcRunner("greq", gs.Conn()),
runn.DBRunner("db", db),
}
o, err := runn.New(opts...)
if err != nil {
t.Fatal(err)
}
if err := o.Run(ctx); err != nil {
t.Error(err)
}
}
})
}
}
43 changes: 25 additions & 18 deletions capturer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ type Capturer interface {
CaptureStart(ids []string, bookPath string)
CaptureEnd(ids []string, bookPath string)

CaptureHTTPRequest(req *http.Request)
CaptureHTTPResponse(res *http.Response)
CaptureHTTPRequest(name string, req *http.Request)
CaptureHTTPResponse(name string, res *http.Response)

CaptureGRPCStart(service, method string)
CaptureGRPCStart(name string, typ GRPCType, service, method string)
CaptureGRPCRequestHeaders(h map[string][]string)
CaptureGRPCRequestMessage(m map[string]interface{})
CaptureGRPCResponseStatus(status int)
CaptureGRPCResponseHeaders(h map[string][]string)
CaptureGRPCResponseMessage(m map[string]interface{})
CaptureGRPCResponseTrailers(t map[string][]string)
CaptureGRPCEnd(service, method string)
CaptureGRPCClientClose()
CaptureGRPCEnd(name string, typ GRPCType, service, method string)

CaptureDBStatement(stmt string)
CaptureDBResponse(res *DBResponse)
CaptureDBStatement(name string, stmt string)
CaptureDBResponse(name string, res *DBResponse)

CaptureExecCommand(command string)
CaptureExecStdin(stdin string)
Expand All @@ -48,21 +49,21 @@ func (cs capturers) captureEnd(ids []string, bookPath string) {
}
}

func (cs capturers) captureHTTPRequest(req *http.Request) {
func (cs capturers) captureHTTPRequest(name string, req *http.Request) {
for _, c := range cs {
c.CaptureHTTPRequest(req)
c.CaptureHTTPRequest(name, req)
}
}

func (cs capturers) captureHTTPResponse(res *http.Response) {
func (cs capturers) captureHTTPResponse(name string, res *http.Response) {
for _, c := range cs {
c.CaptureHTTPResponse(res)
c.CaptureHTTPResponse(name, res)
}
}

func (cs capturers) captureGRPCStart(service, method string) {
func (cs capturers) captureGRPCStart(name string, typ GRPCType, service, method string) {
for _, c := range cs {
c.CaptureGRPCStart(service, method)
c.CaptureGRPCStart(name, typ, service, method)
}
}
func (cs capturers) captureGRPCRequestHeaders(h metadata.MD) {
Expand Down Expand Up @@ -101,21 +102,27 @@ func (cs capturers) captureGRPCResponseTrailers(t metadata.MD) {
}
}

func (cs capturers) captureGRPCEnd(service, method string) {
func (cs capturers) captureGRPCClientClose() {
for _, c := range cs {
c.CaptureGRPCEnd(service, method)
c.CaptureGRPCClientClose()
}
}

func (cs capturers) captureDBStatement(stmt string) {
func (cs capturers) captureGRPCEnd(name string, typ GRPCType, service, method string) {
for _, c := range cs {
c.CaptureDBStatement(stmt)
c.CaptureGRPCEnd(name, typ, service, method)
}
}

func (cs capturers) captureDBResponse(res *DBResponse) {
func (cs capturers) captureDBStatement(name string, stmt string) {
for _, c := range cs {
c.CaptureDBResponse(res)
c.CaptureDBStatement(name, stmt)
}
}

func (cs capturers) captureDBResponse(name string, res *DBResponse) {
for _, c := range cs {
c.CaptureDBResponse(name, res)
}
}

Expand Down
6 changes: 3 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (rnr *dbRunner) Run(ctx context.Context, q *dbQuery) error {
return err
}
for _, stmt := range stmts {
rnr.operator.capturers.captureDBStatement(stmt)
rnr.operator.capturers.captureDBStatement(rnr.name, stmt)
err := func() error {
if !strings.HasPrefix(strings.ToUpper(stmt), "SELECT") {
// exec
Expand All @@ -62,7 +62,7 @@ func (rnr *dbRunner) Run(ctx context.Context, q *dbQuery) error {
"rows_affected": a,
}

rnr.operator.capturers.captureDBResponse(&DBResponse{
rnr.operator.capturers.captureDBResponse(rnr.name, &DBResponse{
LastInsertID: id,
RowsAffected: a,
})
Expand Down Expand Up @@ -128,7 +128,7 @@ func (rnr *dbRunner) Run(ctx context.Context, q *dbQuery) error {
return err
}

rnr.operator.capturers.captureDBResponse(&DBResponse{
rnr.operator.capturers.captureDBResponse(rnr.name, &DBResponse{
Columns: columns,
Rows: rows,
})
Expand Down
15 changes: 9 additions & 6 deletions debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ func NewDebugger(out io.Writer) *debugger {
func (d *debugger) CaptureStart(ids []string, bookPath string) {}
func (d *debugger) CaptureEnd(ids []string, bookPath string) {}

func (d *debugger) CaptureHTTPRequest(req *http.Request) {
func (d *debugger) CaptureHTTPRequest(name string, req *http.Request) {
b, _ := httputil.DumpRequest(req, true)
_, _ = fmt.Fprintf(d.out, "-----START HTTP REQUEST-----\n%s\n-----END HTTP REQUEST-----\n", string(b))
}

func (d *debugger) CaptureHTTPResponse(res *http.Response) {
func (d *debugger) CaptureHTTPResponse(name string, res *http.Response) {
b, _ := httputil.DumpResponse(res, true)
_, _ = fmt.Fprintf(d.out, "-----START HTTP RESPONSE-----\n%s\n-----END HTTP RESPONSE-----\n", string(b))
}

func (d *debugger) CaptureGRPCStart(service, method string) {
func (d *debugger) CaptureGRPCStart(name string, typ GRPCType, service, method string) {
_, _ = fmt.Fprintf(d.out, ">>>>>START gRPC (%s/%s)>>>>>\n", service, method)
}

Expand Down Expand Up @@ -69,15 +69,18 @@ func (d *debugger) CaptureGRPCResponseMessage(m map[string]interface{}) {
func (d *debugger) CaptureGRPCResponseTrailers(t map[string][]string) {
_, _ = fmt.Fprintf(d.out, "-----START gRPC RESPONSE TRAILERS-----\n%s\n-----END gRPC RESPONSE TRAILERS-----\n", dumpGRPCMetadata(t))
}
func (d *debugger) CaptureGRPCEnd(service, method string) {

func (d *debugger) CaptureGRPCClientClose() {}

func (d *debugger) CaptureGRPCEnd(name string, typ GRPCType, service, method string) {
_, _ = fmt.Fprintf(d.out, "<<<<<END gRPC (%s/%s)<<<<<\n", service, method)
}

func (d *debugger) CaptureDBStatement(stmt string) {
func (d *debugger) CaptureDBStatement(name string, stmt string) {
_, _ = fmt.Fprintf(d.out, "-----START QUERY-----\n%s\n-----END QUERY-----\n", stmt)
}

func (d *debugger) CaptureDBResponse(res *DBResponse) {
func (d *debugger) CaptureDBResponse(name string, res *DBResponse) {
_, _ = fmt.Fprint(d.out, "-----START QUERY RESULT-----\n")
defer fmt.Fprint(d.out, "-----END QUERY RESULT-----\n")
if len(res.Rows) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/xo/dburl v0.11.0
go.uber.org/multierr v1.8.0
google.golang.org/grpc v1.48.0
gopkg.in/yaml.v2 v2.4.0
)

require (
Expand Down Expand Up @@ -84,6 +85,5 @@ require (
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
38 changes: 27 additions & 11 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@ import (
"google.golang.org/grpc/status"
)

type grpcOp string
type GRPCType string

const (
grpcOpMessage grpcOp = "message"
grpcOpReceive grpcOp = "receive"
grpcOpClose grpcOp = "close"
GRPCUnary GRPCType = "unary"
GRPCServerStreaming GRPCType = "server"
GRPCClientStreaming GRPCType = "client"
GRPCBidiStreaming GRPCType = "bidi"
)

type GRPCOp string

const (
GRPCOpMessage GRPCOp = "message"
GRPCOpReceive GRPCOp = "receive"
GRPCOpClose GRPCOp = "close"
)

type grpcRunner struct {
Expand All @@ -51,7 +60,7 @@ type grpcRunner struct {
}

type grpcMessage struct {
op grpcOp
op GRPCOp
params map[string]interface{}
}

Expand Down Expand Up @@ -79,8 +88,6 @@ func (rnr *grpcRunner) Close() error {
}

func (rnr *grpcRunner) Run(ctx context.Context, r *grpcRequest) error {
rnr.operator.capturers.captureGRPCStart(r.service, r.method)
defer rnr.operator.capturers.captureGRPCEnd(r.service, r.method)
if rnr.cc == nil {
opts := []grpc.DialOption{
grpc.WithBlock(),
Expand Down Expand Up @@ -147,12 +154,20 @@ func (rnr *grpcRunner) Run(ctx context.Context, r *grpcRequest) error {
req := mf.NewMessage(md.GetInputType())
switch {
case !md.IsServerStreaming() && !md.IsClientStreaming():
rnr.operator.capturers.captureGRPCStart(rnr.name, GRPCUnary, r.service, r.method)
defer rnr.operator.capturers.captureGRPCEnd(rnr.name, GRPCUnary, r.service, r.method)
return rnr.invokeUnary(ctx, stub, md, req, r)
case md.IsServerStreaming() && !md.IsClientStreaming():
rnr.operator.capturers.captureGRPCStart(rnr.name, GRPCServerStreaming, r.service, r.method)
defer rnr.operator.capturers.captureGRPCEnd(rnr.name, GRPCServerStreaming, r.service, r.method)
return rnr.invokeServerStreaming(ctx, stub, md, req, r)
case !md.IsServerStreaming() && md.IsClientStreaming():
rnr.operator.capturers.captureGRPCStart(rnr.name, GRPCClientStreaming, r.service, r.method)
defer rnr.operator.capturers.captureGRPCEnd(rnr.name, GRPCClientStreaming, r.service, r.method)
return rnr.invokeClientStreaming(ctx, stub, md, req, r)
case md.IsServerStreaming() && md.IsClientStreaming():
rnr.operator.capturers.captureGRPCStart(rnr.name, GRPCBidiStreaming, r.service, r.method)
defer rnr.operator.capturers.captureGRPCEnd(rnr.name, GRPCBidiStreaming, r.service, r.method)
return rnr.invokeBidiStreaming(ctx, stub, md, req, r)
default:
return errors.New("something strange happened")
Expand Down Expand Up @@ -315,7 +330,7 @@ func (rnr *grpcRunner) invokeClientStreaming(ctx context.Context, stub grpcdynam
messages := []map[string]interface{}{}
for _, m := range r.messages {
switch m.op {
case grpcOpMessage:
case GRPCOpMessage:
if err := rnr.setMessage(req, m.params); err != nil {
return err
}
Expand Down Expand Up @@ -394,7 +409,7 @@ func (rnr *grpcRunner) invokeBidiStreaming(ctx context.Context, stub grpcdynamic
L:
for _, m := range r.messages {
switch m.op {
case grpcOpMessage:
case GRPCOpMessage:
if err := rnr.setMessage(req, m.params); err != nil {
return err
}
Expand All @@ -403,7 +418,7 @@ L:
rnr.operator.capturers.captureGRPCRequestMessage(m.params)

req.Reset()
case grpcOpReceive:
case GRPCOpReceive:
res, err := stream.RecvMsg()
stat, ok := status.FromError(err)
if !ok {
Expand Down Expand Up @@ -436,9 +451,10 @@ L:

messages = append(messages, msg)
}
case grpcOpClose:
case GRPCOpClose:
clientClose = true
err = stream.CloseSend()
rnr.operator.capturers.captureGRPCClientClose()
break L
default:
return fmt.Errorf("invalid op: %v", m.op)
Expand Down
Loading

0 comments on commit 1761f34

Please sign in to comment.