Skip to content

Commit

Permalink
Merge jaeger feature branch into master (#3130)
Browse files Browse the repository at this point in the history
* Integrate Jaeger gRPC collector (#2976)

Add support for Jaeger gRPC Trace Intake Collector. The gRPC endpoint collects monitoring metrics and supports TLS communication, by reusing the `apm-server.ssl.*` configuration. By default the gRPC endpoint is disabled.

closes #2962 

Co-Authored-By: Andrew Wilkins <[email protected]>

* [Jaeger] Add otel consumer converting batches to Elastic APM events (#3066)

Add consumer converting incoming otel batches to Elastic APM format.
Add integration tests covering incoming gRPC requests being transformed to beat events.

partially implements #3307

* Jaeger http thrift (#3081)

Add an HTTP handler, muxer, and server, in beater/jaeger for accepting Thrift-encoded trace data over HTTP. Refactor beater/jaeger.GRPCServer into Server, which now encapsulates both gRPC and HTTP servers.

Move beater/api/jaeger code into beater/jaeger, which is the only user of GRPCCollector. If the beater/jaeger code grows significantly, we might consider having subpackages like beater/jaeger/grpc, beater/jaeger/http, etc.

* [jaeger] Convert Timeevents to errors (#3085)

* [jaeger] Convert Timeevents to errors

Parse Timeevents from Jaeger spans and convert to elastic error events
if they describe an error.

Fixes #3007

* Add experimental flag to Jaeger integration (#3121)

* tests/system: system test for Jaeger Thrift/HTTP (#3114)

* tests/system: system test for Jaeger Thrift/HTTP

* tests/system: system test for Jaeger gRPC

* processor/otel: update approvals

Co-authored-by: Silvia Mitter <[email protected]>
  • Loading branch information
axw and simitt authored Jan 9, 2020
2 parents 190da76 + 4867f08 commit 9fe0468
Show file tree
Hide file tree
Showing 573 changed files with 155,989 additions and 448 deletions.
275 changes: 257 additions & 18 deletions NOTICE.txt

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ apm-server:
#readonly: {}



#---------------------------- APM Server - Experimental Jaeger integration ----------------------------

# When enabling Jaeger integration, APM Server acts as Jaeger collector. It supports jaeger.thrift over HTTP
# and gRPC. This is an experimental feature, use with care.
#jaeger:
#grpc:
# Set to true to enable the Jaeger gRPC collector service.
#enabled: false
# Defines the gRPC host and port the server is listening on.
# Defaults to the standard Jaeger gRPC collector port 14250.
#host: "{{ .jaeger_grpc_hostport }}"
#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
# Defines the HTTP host and port the server is listening on.
# Defaults to the standard Jaeger HTTP collector port 14268.
#host: "{{ .jaeger_http_hostport }}"

#================================= General =================================

# Data is buffered in a memory queue before it is published to the configured output.
Expand Down
19 changes: 19 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ apm-server:
#readonly: {}



#---------------------------- APM Server - Experimental Jaeger integration ----------------------------

# When enabling Jaeger integration, APM Server acts as Jaeger collector. It supports jaeger.thrift over HTTP
# and gRPC. This is an experimental feature, use with care.
#jaeger:
#grpc:
# Set to true to enable the Jaeger gRPC collector service.
#enabled: false
# Defines the gRPC host and port the server is listening on.
# Defaults to the standard Jaeger gRPC collector port 14250.
#host: "0.0.0.0:14250"
#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
# Defines the HTTP host and port the server is listening on.
# Defaults to the standard Jaeger HTTP collector port 14268.
#host: "0.0.0.0:14268"

#================================= General =================================

# Data is buffered in a memory queue before it is published to the configured output.
Expand Down
19 changes: 19 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ apm-server:
#readonly: {}



#---------------------------- APM Server - Experimental Jaeger integration ----------------------------

# When enabling Jaeger integration, APM Server acts as Jaeger collector. It supports jaeger.thrift over HTTP
# and gRPC. This is an experimental feature, use with care.
#jaeger:
#grpc:
# Set to true to enable the Jaeger gRPC collector service.
#enabled: false
# Defines the gRPC host and port the server is listening on.
# Defaults to the standard Jaeger gRPC collector port 14250.
#host: "localhost:14250"
#http:
# Set to true to enable the Jaeger HTTP collector endpoint.
#enabled: false
# Defines the HTTP host and port the server is listening on.
# Defaults to the standard Jaeger HTTP collector port 14268.
#host: "localhost:14268"

#================================= General =================================

# Data is buffered in a memory queue before it is published to the configured output.
Expand Down
2 changes: 1 addition & 1 deletion beater/api/asset/sourcemap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.sourcemap", monitoring.PublishExpvar)
)

Expand Down
2 changes: 1 addition & 1 deletion beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.acm", monitoring.PublishExpvar)

errMsgKibanaDisabled = errors.New(msgKibanaDisabled)
Expand Down
2 changes: 1 addition & 1 deletion beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.server", monitoring.PublishExpvar)
)

Expand Down
4 changes: 2 additions & 2 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type route struct {

// NewMux registers apm handlers to paths building up the APM Server API.
func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMux, error) {
pool := newContextPool()
pool := request.NewContextPool()
mux := http.NewServeMux()
logger := logp.NewLogger(logs.Handler)

Expand Down Expand Up @@ -112,7 +112,7 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
return nil, err
}
logger.Infof("Path %s added to request handler", route.path)
mux.Handle(route.path, pool.handler(h))
mux.Handle(route.path, pool.HTTPHandler(h))

}
if beaterConfig.Expvar.IsEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion beater/api/profile/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar)
)

Expand Down
2 changes: 1 addition & 1 deletion beater/api/root/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.root", monitoring.PublishExpvar)
)

Expand Down
60 changes: 6 additions & 54 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
package beater

import (
"bufio"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"sync"
"time"

"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
Expand All @@ -48,11 +43,11 @@ func init() {

type beater struct {
config *config.Config
logger *logp.Logger
mutex sync.Mutex // guards server and stopped
server *http.Server
server server
stopping chan struct{}
stopped bool
logger *logp.Logger
}

var (
Expand Down Expand Up @@ -190,56 +185,15 @@ func (bt *beater) Run(b *beat.Beat) error {
return nil
}

bt.server, err = newServer(bt.config, tracer, pub.Send)
bt.server, err = newServer(bt.logger, bt.config, tracer, pub.Send)
if err != nil {
bt.logger.Error("failed to create new server:", err)
return nil
}
bt.mutex.Unlock()

var g errgroup.Group
g.Go(func() error {
return run(bt.logger, bt.server, lis, bt.config)
})

if bt.isServerAvailable(bt.config.ShutdownTimeout) {
go notifyListening(bt.config, pub.Client().Publish)
}

if tracerServer != nil {
g.Go(func() error {
return tracerServer.serve(pub.Send)
})
}

if err := g.Wait(); err != http.ErrServerClosed {
return err
}
bt.logger.Infof("Server stopped")
return nil
}

func (bt *beater) isServerAvailable(timeout time.Duration) bool {
// following an example from https://golang.org/pkg/net/
// dial into tcp connection to ensure listener is ready, send get request and read response,
// in case tls is enabled, the server will respond with 400,
// as this only checks the server is up and reachable errors can be ignored
conn, err := net.DialTimeout("tcp", bt.config.Host, timeout)
if err != nil {
return false
}
err = conn.SetReadDeadline(time.Now().Add(timeout))
if err != nil {
return false
}
fmt.Fprintf(conn, "GET / HTTP/1.0\r\n\r\n")
_, err = bufio.NewReader(conn).ReadByte()
if err != nil {
return false
}

err = conn.Close()
return err == nil
//blocking until shutdown
return bt.server.run(lis, tracerServer, pub)
}

func isElasticsearchOutput(b *beat.Beat) bool {
Expand Down Expand Up @@ -274,9 +228,7 @@ func (bt *beater) Stop() {
}
bt.logger.Infof("stopping apm-server... waiting maximum of %v seconds for queues to drain",
bt.config.ShutdownTimeout.Seconds())
if bt.server != nil {
stop(bt.logger, bt.server)
}
bt.server.stop(bt.logger)
close(bt.stopping)
bt.stopped = true
}
4 changes: 2 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (bt *beater) client(insecure bool) (string, *http.Client) {

bt.mutex.Lock() // for reading bt.server
defer bt.mutex.Unlock()
if parsed, err := url.Parse(bt.server.Addr); err == nil && parsed.Scheme == "unix" {
if parsed, err := url.Parse(bt.server.httpServer.Addr); err == nil && parsed.Scheme == "unix" {
transport.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", parsed.Path)
}
Expand All @@ -81,7 +81,7 @@ func (bt *beater) wait() error {
go func() {
for {
bt.mutex.Lock()
if bt.server != nil {
if bt.server.httpServer != nil {
bt.mutex.Unlock()
break
}
Expand Down
6 changes: 3 additions & 3 deletions beater/beatertest/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func CompareMonitoringInt(
m map[request.ResultID]*monitoring.Int,
) (bool, string) {

clearRegistry(m)
ClearRegistry(m)
handler(c)

var result string
Expand Down Expand Up @@ -65,9 +65,9 @@ func AllRequestResultIDs() []request.ResultID {
return ids
}

// clearRegistry sets all counters to 0 and removes all registered counters from the registry
// ClearRegistry sets all counters to 0 and removes all registered counters from the registry
// Only use this in test environments
func clearRegistry(m map[request.ResultID]*monitoring.Int) {
func ClearRegistry(m map[request.ResultID]*monitoring.Int) {
for _, i := range m {
if i != nil {
i.Set(0)
Expand Down
11 changes: 8 additions & 3 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Config struct {
Mode Mode `config:"mode"`
Kibana *common.Config `config:"kibana"`
AgentConfig *AgentConfig `config:"agent.config"`

SecretToken string `config:"secret_token"`
APIKeyConfig *APIKeyConfig `config:"api_key"`
SecretToken string `config:"secret_token"`
APIKeyConfig *APIKeyConfig `config:"api_key"`
JaegerConfig JaegerConfig `config:"jaeger"`

Pipeline string
}
Expand Down Expand Up @@ -125,6 +125,10 @@ func NewConfig(version string, ucfg *common.Config, outputESCfg *common.Config)
return nil, err
}

if err := c.JaegerConfig.setup(c); err != nil {
return nil, err
}

return c, nil
}

Expand Down Expand Up @@ -156,5 +160,6 @@ func DefaultConfig(beatVersion string) *Config {
AgentConfig: &AgentConfig{Cache: &Cache{Expiration: 30 * time.Second}},
Pipeline: defaultAPMPipeline,
APIKeyConfig: defaultAPIKeyConfig(),
JaegerConfig: defaultJaeger(),
}
}
Loading

0 comments on commit 9fe0468

Please sign in to comment.