Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beater: CreatorParams.RunServer -> WrapRunServer #3803

Merged
merged 2 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/pkg/errors"
"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -44,18 +45,16 @@ var (

// CreatorParams holds parameters for creating beat.Beaters.
type CreatorParams struct {
// RunServer is used to run the APM Server.
// WrapRunServer is used to wrap the RunServerFunc used to run the APM Server.
//
// This should be set to beater.RunServer, or a function which wraps it.
RunServer RunServerFunc
// WrapRunServer is optional. If provided, it must return a function that calls
// its input, possibly modifying the parameters on the way in.
WrapRunServer func(RunServerFunc) RunServerFunc
}

// NewCreator returns a new beat.Creator which creates beaters
// using the provided CreatorParams.
func NewCreator(args CreatorParams) beat.Creator {
if args.RunServer == nil {
panic("args.RunServer must be non-nil")
}
return func(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
logger := logp.NewLogger(logs.Beater)
if err := checkConfig(logger); err != nil {
Expand All @@ -72,10 +71,10 @@ func NewCreator(args CreatorParams) beat.Creator {
}

bt := &beater{
config: beaterConfig,
stopped: false,
logger: logger,
runServer: args.RunServer,
config: beaterConfig,
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
}

// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all
Expand Down Expand Up @@ -119,9 +118,9 @@ func checkConfig(logger *logp.Logger) error {
}

type beater struct {
config *config.Config
logger *logp.Logger
runServer RunServerFunc
config *config.Config
logger *logp.Logger
wrapRunServer func(RunServerFunc) RunServerFunc

mutex sync.Mutex // guards stopServer and stopped
stopServer func()
Expand All @@ -137,30 +136,14 @@ func (bt *beater) Run(b *beat.Beat) error {
}
defer tracer.Close()

runServer := bt.runServer
runServer := runServer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, can't you move L144 up here, and wrap that same origRunServer in L166?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too sure what you mean, but I've extracted this tracer-related code to a separate function. Hopefully that will clarify things, let me know.

if tracerServer != nil {
// Self-instrumentation enabled, so running the APM Server
// should run an internal server for receiving trace data.
origRunServer := runServer
runServer = func(ctx context.Context, args ServerParams) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer tracerServer.stop()
<-ctx.Done()
// Close the tracer now to prevent the server
// from waiting for more events during graceful
// shutdown.
tracer.Close()
return nil
})
g.Go(func() error {
return tracerServer.serve(args.Reporter)
})
g.Go(func() error {
return origRunServer(ctx, args)
})
return g.Wait()
}
runServer = runServerWithTracerServer(runServer, tracerServer, tracer)
}
if bt.wrapRunServer != nil {
// Wrap runServer function, enabling injection of
// behaviour into the processing/reporting pipeline.
runServer = bt.wrapRunServer(runServer)
}

publisher, err := publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{
Expand Down Expand Up @@ -244,3 +227,27 @@ func (bt *beater) Stop() {
bt.stopServer()
bt.stopped = true
}

// runServerWithTracerServer wraps runServer such that it also runs
// tracerServer, stopping it and the tracer when the server shuts down.
func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServer, tracer *apm.Tracer) RunServerFunc {
return func(ctx context.Context, args ServerParams) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer tracerServer.stop()
<-ctx.Done()
// Close the tracer now to prevent the server
// from waiting for more events during graceful
// shutdown.
tracer.Close()
return nil
})
g.Go(func() error {
return tracerServer.serve(args.Reporter)
})
g.Go(func() error {
return runServer(ctx, args)
})
return g.Wait()
}
}
47 changes: 33 additions & 14 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -42,26 +43,44 @@ type testBeater struct {
client *http.Client
}

func setupBeater(t *testing.T, apmBeat *beat.Beat, ucfg *common.Config, beatConfig *beat.BeatConfig) (*testBeater, error) {
func setupBeater(
t *testing.T,
apmBeat *beat.Beat,
ucfg *common.Config,
beatConfig *beat.BeatConfig,
) (*testBeater, error) {

onboardingDocs := make(chan onboardingDoc, 1)
createBeater := NewCreator(CreatorParams{
RunServer: func(ctx context.Context, args ServerParams) error {
// Wrap the reporter so we can intercept the
// onboarding doc, to extract the listen address.
origReporter := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
for _, tf := range req.Transformables {
if o, ok := tf.(onboardingDoc); ok {
select {
case <-ctx.Done():
return ctx.Err()
case onboardingDocs <- o:
WrapRunServer: func(runServer RunServerFunc) RunServerFunc {
return func(ctx context.Context, args ServerParams) error {
// Wrap the reporter so we can intercept the
// onboarding doc, to extract the listen address.
origReporter := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
for _, tf := range req.Transformables {
switch tf := tf.(type) {
case onboardingDoc:
select {
case <-ctx.Done():
return ctx.Err()
case onboardingDocs <- tf:
}

case *model.Transaction:
// Add a label to test that everything
// goes through the wrapped reporter.
if tf.Labels == nil {
labels := make(model.Labels)
tf.Labels = &labels
}
(*tf.Labels)["wrapped_reporter"] = true
}
}
return origReporter(ctx, req)
}
return origReporter(ctx, req)
return runServer(ctx, args)
}
return RunServer(ctx, args)
},
})

Expand Down
4 changes: 2 additions & 2 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type ServerParams struct {
Reporter publish.Reporter
}

// RunServer runs the APM Server until a fatal error occurs, or ctx is cancelled.
func RunServer(ctx context.Context, args ServerParams) error {
// runServer runs the APM Server until a fatal error occurs, or ctx is cancelled.
func runServer(ctx context.Context, args ServerParams) error {
srv, err := newServer(args.Logger, args.Config, args.Tracer, args.Reporter)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@
"ab_testing": true,
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"segment": 5
"segment": 5,
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
"host": {
"ip": "127.0.0.1"
},
"labels": {
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
"hostname": "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
},
"labels": {
"tag1": "one",
"tag2": 2
"tag2": 2,
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -253,7 +254,8 @@
"tag1": "one",
"tag2": 12,
"tag3": 12.45,
"tag4": false
"tag4": false,
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -419,7 +421,8 @@
},
"labels": {
"tag1": "one",
"tag2": 2
"tag2": 2,
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -555,7 +558,8 @@
},
"labels": {
"tag1": "one",
"tag2": 2
"tag2": 2,
"wrapped_reporter": true
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down
6 changes: 6 additions & 0 deletions beater/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func TestServerTracingEnabled(t *testing.T) {
if testTransactionIds.Contains(eventTransactionId(e)) {
continue
}

// Check that self-instrumentation goes through the
// reporter wrapped by setupBeater.
wrapped, _ := e.GetValue("labels.wrapped_reporter")
assert.Equal(t, true, wrapped)

selfTransactions = append(selfTransactions, eventTransactionName(e))
case <-time.After(5 * time.Second):
assert.FailNow(t, "timed out waiting for transaction")
Expand Down
4 changes: 1 addition & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"github.com/elastic/apm-server/cmd"
)

var rootCmd = cmd.NewRootCommand(beater.NewCreator(beater.CreatorParams{
RunServer: beater.RunServer,
}))
var rootCmd = cmd.NewRootCommand(beater.NewCreator(beater.CreatorParams{}))

func main() {
if err := rootCmd.Execute(); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions x-pack/apm-server/cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ func TestSubCommands(t *testing.T) {
"version": {},
}

rootCmd := NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{
RunServer: beater.RunServer,
}))
rootCmd := NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{}))
for _, cmd := range rootCmd.Commands() {
name := cmd.Name()
if _, ok := validCommands[name]; !ok {
Expand Down
12 changes: 7 additions & 5 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
// and the publish.Reporter will be wrapped such that all
// transactions pass through the aggregator before being
// published to libbeat.
func runServerWithAggregator(ctx context.Context, args beater.ServerParams) error {
func runServerWithAggregator(ctx context.Context, runServer beater.RunServerFunc, args beater.ServerParams) error {
if !args.Config.Aggregation.Enabled {
return beater.RunServer(ctx, args)
return runServer(ctx, args)
}

agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Expand Down Expand Up @@ -58,14 +58,16 @@ func runServerWithAggregator(ctx context.Context, args beater.ServerParams) erro
}
})
g.Go(func() error {
return beater.RunServer(ctx, args)
return runServer(ctx, args)
})
return g.Wait()
}

var rootCmd = cmd.NewXPackRootCommand(beater.NewCreator(beater.CreatorParams{
RunServer: func(ctx context.Context, args beater.ServerParams) error {
return runServerWithAggregator(ctx, args)
WrapRunServer: func(runServer beater.RunServerFunc) beater.RunServerFunc {
return func(ctx context.Context, args beater.ServerParams) error {
return runServerWithAggregator(ctx, runServer, args)
}
},
}))

Expand Down