diff --git a/xray/config.go b/xray/config.go index a2b4d695..bc466ab9 100644 --- a/xray/config.go +++ b/xray/config.go @@ -64,6 +64,12 @@ func newGlobalConfig() *globalConfig { } ret.streamingStrategy = sts + emt, err := NewDefaultEmitter(ret.daemonAddr) + if err != nil { + panic(err) + } + ret.emitter = emt + cm := ctxmissing.NewDefaultRuntimeErrorStrategy() ret.contextMissingStrategy = cm @@ -75,6 +81,7 @@ type globalConfig struct { sync.RWMutex daemonAddr *net.UDPAddr + emitter Emitter serviceVersion string samplingStrategy sampling.Strategy streamingStrategy StreamingStrategy @@ -88,6 +95,7 @@ type globalConfig struct { type Config struct { DaemonAddr string ServiceVersion string + Emitter Emitter SamplingStrategy sampling.Strategy StreamingStrategy StreamingStrategy ExceptionFormattingStrategy exception.FormattingStrategy @@ -103,8 +111,12 @@ func ContextWithConfig(ctx context.Context, c Config) (context.Context, error) { daemonEndpoints, er := daemoncfg.GetDaemonEndpointsFromString(c.DaemonAddr) if daemonEndpoints != nil { - go refreshEmitterWithAddress(daemonEndpoints.UDPAddr) - configureStrategy(c.SamplingStrategy, daemonEndpoints) + if c.Emitter != nil { + c.Emitter.RefreshEmitterWithAddress(daemonEndpoints.UDPAddr) + } + if c.SamplingStrategy != nil { + configureStrategy(c.SamplingStrategy, daemonEndpoints) + } } else if er != nil { errors = append(errors, er) } @@ -156,10 +168,14 @@ func Configure(c Config) error { globalCfg.samplingStrategy = c.SamplingStrategy } + if c.Emitter != nil { + globalCfg.emitter = c.Emitter + } + daemonEndpoints, er := daemoncfg.GetDaemonEndpointsFromString(c.DaemonAddr) if daemonEndpoints != nil { globalCfg.daemonAddr = daemonEndpoints.UDPAddr - go refreshEmitter() + globalCfg.emitter.RefreshEmitterWithAddress(globalCfg.daemonAddr) configureStrategy(globalCfg.samplingStrategy, daemonEndpoints) } else if er != nil { errors = append(errors, er) diff --git a/xray/config_test.go b/xray/config_test.go index 3a78475d..a3a924d6 100644 --- a/xray/config_test.go +++ b/xray/config_test.go @@ -30,6 +30,8 @@ type TestStreamingStrategy struct{} type TestContextMissingStrategy struct{} +type TestEmitter struct{} + func (tss *TestSamplingStrategy) ShouldTrace(request *sampling.Request) *sampling.Decision { return &sampling.Decision{ Sample: true, @@ -65,6 +67,10 @@ func (sms *TestStreamingStrategy) StreamCompletedSubsegments(seg *Segment) [][]b return test } +func (te *TestEmitter) Emit(seg *Segment) {} + +func (te *TestEmitter) RefreshEmitterWithAddress(raddr *net.UDPAddr) {} + func (cms *TestContextMissingStrategy) ContextMissing(v interface{}) { fmt.Sprintf("Test ContextMissing Strategy %v", v) } @@ -89,12 +95,19 @@ func ResetConfig() { sms, _ := NewDefaultStreamingStrategy() cms := ctxmissing.NewDefaultRuntimeErrorStrategy() + udpAddr := &net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 2000, + } + e, _ := NewDefaultEmitter(udpAddr) + Configure(Config{ DaemonAddr: "127.0.0.1:2000", LogLevel: "info", LogFormat: "%Date(2006-01-02T15:04:05Z07:00) [%Level] %Msg%n", SamplingStrategy: ss, StreamingStrategy: sms, + Emitter: e, ExceptionFormattingStrategy: efs, ContextMissingStrategy: cms, }) @@ -203,6 +216,7 @@ func TestConfigureWithContext(t *testing.T) { efs := &TestExceptionFormattingStrategy{} sms := &TestStreamingStrategy{} cms := &TestContextMissingStrategy{} + de := &TestEmitter{} ctx, err := ContextWithConfig(context.Background(), Config{ DaemonAddr: daemonAddr, @@ -210,6 +224,7 @@ func TestConfigureWithContext(t *testing.T) { SamplingStrategy: ss, ExceptionFormattingStrategy: efs, StreamingStrategy: sms, + Emitter: de, ContextMissingStrategy: cms, LogLevel: logLevel, LogFormat: logFormat, @@ -223,6 +238,27 @@ func TestConfigureWithContext(t *testing.T) { assert.Equal(t, ss, cfg.SamplingStrategy) assert.Equal(t, efs, cfg.ExceptionFormattingStrategy) assert.Equal(t, sms, cfg.StreamingStrategy) + assert.Equal(t, de, cfg.Emitter) + assert.Equal(t, cms, cfg.ContextMissingStrategy) + assert.Equal(t, serviceVersion, cfg.ServiceVersion) + + ResetConfig() +} + +func TestSelectiveConfigWithContext(t *testing.T) { + daemonAddr := "127.0.0.1:3000" + serviceVersion := "TestVersion" + cms := &TestContextMissingStrategy{} + + ctx, err := ContextWithConfig(context.Background(), Config{ + DaemonAddr: daemonAddr, + ServiceVersion: serviceVersion, + ContextMissingStrategy: cms, + }) + + cfg := GetRecorder(ctx) + assert.Nil(t, err) + assert.Equal(t, daemonAddr, cfg.DaemonAddr) assert.Equal(t, cms, cfg.ContextMissingStrategy) assert.Equal(t, serviceVersion, cfg.ServiceVersion) diff --git a/xray/default_emitter.go b/xray/default_emitter.go new file mode 100644 index 00000000..4211efc2 --- /dev/null +++ b/xray/default_emitter.go @@ -0,0 +1,103 @@ +// Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +package xray + +import ( + "bytes" + "encoding/json" + "net" + "sync" + + log "github.com/cihub/seelog" +) + +// Header is added before sending segments to daemon. +var Header = []byte(`{"format": "json", "version": 1}` + "\n") + +// DefaultEmitter provides the naive implementation of emitting trace entities. +type DefaultEmitter struct { + sync.Mutex + conn *net.UDPConn +} + +// NewDefaultEmitter initializes and returns a +// pointer to an instance of DefaultEmitter. +func NewDefaultEmitter(raddr *net.UDPAddr) (*DefaultEmitter, error) { + initLambda() + d := &DefaultEmitter{} + d.RefreshEmitterWithAddress(raddr) + return d, nil +} + +// RefreshEmitterWithAddress dials UDP based on the input UDP address. +func (de *DefaultEmitter) RefreshEmitterWithAddress(raddr *net.UDPAddr) { + de.Lock() + de.conn, _ = net.DialUDP("udp", nil, raddr) + log.Infof("Emitter using address: %v", raddr) + de.Unlock() +} + +// Emit segment or subsegment if root segment is sampled. +func (de *DefaultEmitter) Emit(seg *Segment) { + if seg == nil || !seg.ParentSegment.Sampled { + return + } + + var logLevel string + if seg.Configuration != nil && seg.Configuration.LogLevel == "trace" { + logLevel = "trace" + } else if globalCfg.logLevel <= log.TraceLvl { + logLevel = "trace" + } + + for _, p := range packSegments(seg, nil) { + if logLevel == "trace" { + b := &bytes.Buffer{} + json.Indent(b, p, "", " ") + log.Trace(b.String()) + } + de.Lock() + _, err := de.conn.Write(append(Header, p...)) + if err != nil { + log.Error(err) + } + de.Unlock() + } +} + +func packSegments(seg *Segment, outSegments [][]byte) [][]byte { + trimSubsegment := func(s *Segment) []byte { + ss := globalCfg.StreamingStrategy() + if seg.ParentSegment.Configuration != nil && seg.ParentSegment.Configuration.StreamingStrategy != nil { + ss = seg.ParentSegment.Configuration.StreamingStrategy + } + for ss.RequiresStreaming(s) { + if len(s.rawSubsegments) == 0 { + break + } + cb := ss.StreamCompletedSubsegments(s) + outSegments = append(outSegments, cb...) + } + b, _ := json.Marshal(s) + return b + } + + for _, s := range seg.rawSubsegments { + outSegments = packSegments(s, outSegments) + if b := trimSubsegment(s); b != nil { + seg.Subsegments = append(seg.Subsegments, b) + } + } + if seg.parent == nil { + if b := trimSubsegment(seg); b != nil { + outSegments = append(outSegments, b) + } + } + return outSegments +} diff --git a/xray/emitter_test.go b/xray/default_emitter_test.go similarity index 100% rename from xray/emitter_test.go rename to xray/default_emitter_test.go diff --git a/xray/emitter.go b/xray/emitter.go index b1f9a23e..a0e28171 100644 --- a/xray/emitter.go +++ b/xray/emitter.go @@ -8,99 +8,10 @@ package xray -import ( - "bytes" - "encoding/json" - "net" - "sync" +import "net" - log "github.com/cihub/seelog" -) - -// Header is added before sending segments to daemon. -var Header = []byte(`{"format": "json", "version": 1}` + "\n") - -type emitter struct { - sync.Mutex - conn *net.UDPConn -} - -var e = &emitter{} - -func init() { - initLambda() - refreshEmitter() -} - -func refreshEmitter() { - e.Lock() - e.conn, _ = net.DialUDP("udp", nil, globalCfg.DaemonAddr()) - log.Infof("Emitter using address from global config: %v", globalCfg.DaemonAddr()) - e.Unlock() -} - -func refreshEmitterWithAddress(raddr *net.UDPAddr) { - e.Lock() - e.conn, _ = net.DialUDP("udp", nil, raddr) - log.Infof("Emitter using address: %v", raddr) - e.Unlock() -} - -// Emit segment or subsegment if root segment is sampled. -func Emit(seg *Segment) { - if seg == nil || !seg.ParentSegment.Sampled { - return - } - - var logLevel string - if seg.Configuration != nil && seg.Configuration.LogLevel == "trace" { - logLevel = "trace" - } else if globalCfg.logLevel <= log.TraceLvl { - logLevel = "trace" - } - - for _, p := range packSegments(seg, nil) { - if logLevel == "trace" { - b := &bytes.Buffer{} - json.Indent(b, p, "", " ") - log.Trace(b.String()) - } - e.Lock() - _, err := e.conn.Write(append(Header, p...)) - if err != nil { - log.Error(err) - } - e.Unlock() - } -} - -func packSegments(seg *Segment, outSegments [][]byte) [][]byte { - trimSubsegment := func(s *Segment) []byte { - ss := globalCfg.StreamingStrategy() - if seg.ParentSegment.Configuration != nil && seg.ParentSegment.Configuration.StreamingStrategy != nil { - ss = seg.ParentSegment.Configuration.StreamingStrategy - } - for ss.RequiresStreaming(s) { - if len(s.rawSubsegments) == 0 { - break - } - cb := ss.StreamCompletedSubsegments(s) - outSegments = append(outSegments, cb...) - } - b, _ := json.Marshal(s) - return b - } - - for _, s := range seg.rawSubsegments { - outSegments = packSegments(s, outSegments) - if b := trimSubsegment(s); b != nil { - seg.Subsegments = append(seg.Subsegments, b) - } - } - if seg.parent == nil { - if b := trimSubsegment(seg); b != nil { - outSegments = append(outSegments, b) - } - } - return outSegments +// Emitter provides an interface for implementing emitting trace entities. +type Emitter interface { + Emit(seg *Segment) + RefreshEmitterWithAddress(raddr *net.UDPAddr) } diff --git a/xray/lambda.go b/xray/lambda.go index aa402c74..4a66cebc 100644 --- a/xray/lambda.go +++ b/xray/lambda.go @@ -10,11 +10,12 @@ package xray import ( "context" - "github.com/aws/aws-xray-sdk-go/header" - log "github.com/cihub/seelog" "os" "path/filepath" "time" + + "github.com/aws/aws-xray-sdk-go/header" + log "github.com/cihub/seelog" ) // LambdaTraceHeaderKey is key to get trace header from context. @@ -75,14 +76,13 @@ func createFile(dir string, name string) (string, error) { e := os.MkdirAll(dir, os.ModePerm) if e != nil { return filePath, e - } else { - var file, err = os.Create(filePath) - if err != nil { - return filePath, err - } - file.Close() - return filePath, nil } + var file, err = os.Create(filePath) + if err != nil { + return filePath, err + } + file.Close() + return filePath, nil } else if err != nil { return filePath, err } diff --git a/xray/segment.go b/xray/segment.go index 689aea90..c1ac2c2a 100644 --- a/xray/segment.go +++ b/xray/segment.go @@ -114,6 +114,7 @@ func (seg *Segment) assignConfiguration(cfg *Config) { seg.GetConfiguration().ExceptionFormattingStrategy = globalCfg.exceptionFormattingStrategy seg.GetConfiguration().SamplingStrategy = globalCfg.samplingStrategy seg.GetConfiguration().StreamingStrategy = globalCfg.streamingStrategy + seg.GetConfiguration().Emitter = globalCfg.emitter seg.GetConfiguration().ServiceVersion = globalCfg.serviceVersion } else { if cfg.ContextMissingStrategy != nil { @@ -140,6 +141,12 @@ func (seg *Segment) assignConfiguration(cfg *Config) { seg.GetConfiguration().StreamingStrategy = globalCfg.streamingStrategy } + if cfg.Emitter != nil { + seg.GetConfiguration().Emitter = cfg.Emitter + } else { + seg.GetConfiguration().Emitter = globalCfg.emitter + } + if cfg.ServiceVersion != "" { seg.GetConfiguration().ServiceVersion = cfg.ServiceVersion } else { @@ -264,7 +271,7 @@ func (subseg *Segment) CloseAndStream(err error) { subseg.beforeEmitSubsegment(subseg.parent) subseg.Unlock() - Emit(subseg) + subseg.emit() } // RemoveSubsegment removes a subsegment child from a segment or subsegment. @@ -286,6 +293,10 @@ func (seg *Segment) RemoveSubsegment(remove *Segment) bool { return false } +func (seg *Segment) emit() { + seg.ParentSegment.GetConfiguration().Emitter.Emit(seg) +} + func (seg *Segment) handleContextDone() { seg.Lock() defer seg.Unlock() @@ -300,12 +311,12 @@ func (seg *Segment) flush() { if (seg.openSegments == 0 && seg.EndTime > 0) || seg.ContextDone { if seg.parent == nil { seg.Emitted = true - Emit(seg) + seg.emit() } else if seg.parent != nil && seg.parent.Facade { seg.Emitted = true seg.beforeEmitSubsegment(seg.parent) log.Tracef("emit lambda subsegment named: %v", seg.Name) - Emit(seg) + seg.emit() } else { seg.parent.safeFlush() } diff --git a/xray/segment_test.go b/xray/segment_test.go index 5a4cddcd..9df79623 100644 --- a/xray/segment_test.go +++ b/xray/segment_test.go @@ -17,7 +17,7 @@ import ( func TestSegmentDataRace(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - for i := 0; i < 10; i += 1 { // flaky data race test, so we run it multiple times + for i := 0; i < 10; i++ { // flaky data race test, so we run it multiple times _, seg := BeginSegment(ctx, "TestSegment") go seg.Close(nil)