Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor opencensus exporter to make it easily extensible #212

Merged
merged 1 commit into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"google.golang.org/grpc/encoding/gzip"

"github.com/open-telemetry/opentelemetry-service/internal/compression"
"github.com/open-telemetry/opentelemetry-service/compression"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package grpc
import (
"testing"

"github.com/open-telemetry/opentelemetry-service/internal/compression"
"github.com/open-telemetry/opentelemetry-service/compression"
)

func TestGetGRPCCompressionKey(t *testing.T) {
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ type Config struct {
CertPemFile string `mapstructure:"cert-pem-file"`
UseSecure bool `mapstructure:"secure,omitempty"`
ReconnectionDelay time.Duration `mapstructure:"reconnection-delay,omitempty"`
KeepaliveParameters *keepaliveConfig `mapstructure:"keepalive,omitempty"`
KeepaliveParameters *KeepaliveConfig `mapstructure:"keepalive,omitempty"`
}
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLoadConfig(t *testing.T) {
CertPemFile: "/var/lib/mycert.pem",
UseSecure: true,
ReconnectionDelay: 15,
KeepaliveParameters: &keepaliveConfig{
KeepaliveParameters: &KeepaliveConfig{
Time: 20,
PermitWithoutStream: true,
Timeout: 30,
Expand Down
79 changes: 45 additions & 34 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/open-telemetry/opentelemetry-service/compression"
compressiongrpc "github.com/open-telemetry/opentelemetry-service/compression/grpc"
"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/exporter"
"github.com/open-telemetry/opentelemetry-service/exporter/exporterhelper"
"github.com/open-telemetry/opentelemetry-service/internal/compression"
compressiongrpc "github.com/open-telemetry/opentelemetry-service/internal/compression/grpc"
)

const (
Expand Down Expand Up @@ -61,20 +61,57 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (consumer.TraceConsumer, exporter.StopFunc, error) {
ocac := config.(*Config)
opts, err := f.OCAgentOptions(logger, ocac)
if err != nil {
return nil, nil, err
}
return f.CreateOCAgent(logger, ocac, opts)
}

// CreateOCAgent takes ocagent exporter options and create an OC exporter
func (f *Factory) CreateOCAgent(logger *zap.Logger, ocac *Config, opts []ocagent.ExporterOption) (consumer.TraceConsumer, exporter.StopFunc, error) {
numWorkers := defaultNumWorkers
if ocac.NumWorkers > 0 {
numWorkers = ocac.NumWorkers
}

exportersChan := make(chan *ocagent.Exporter, numWorkers)
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
exporter, serr := ocagent.NewExporter(opts...)
if serr != nil {
return nil, nil, fmt.Errorf("cannot configure OpenCensus Trace exporter: %v", serr)
}
exportersChan <- exporter
}

oce := &ocagentExporter{exporters: exportersChan}
oexp, err := exporterhelper.NewTraceExporter(
"oc_trace",
oce.PushTraceData,
exporterhelper.WithSpanName("ocservice.exporter.OpenCensus.ConsumeTraceData"),
exporterhelper.WithRecordMetrics(true))

if err != nil {
return nil, nil, err
}

return oexp, oce.stop, nil
}

// OCAgentOptions takes the oc exporter Config and generates ocagent Options
func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.ExporterOption, error) {
if ocac.Endpoint == "" {
return nil, nil, &ocTraceExporterError{
return nil, &ocTraceExporterError{
code: errEndpointRequired,
msg: "OpenCensus exporter config requires an Endpoint",
}
}

opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)}
if ocac.Compression != "" {
if compressionKey := compressiongrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported {
opts = append(opts, ocagent.UseCompressor(compressionKey))
} else {
return nil, nil, &ocTraceExporterError{
return nil, &ocTraceExporterError{
code: errUnsupportedCompressionType,
msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression),
}
Expand All @@ -83,7 +120,7 @@ func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Ex
if ocac.CertPemFile != "" {
creds, err := credentials.NewClientTLSFromFile(ocac.CertPemFile, "")
if err != nil {
return nil, nil, &ocTraceExporterError{
return nil, &ocTraceExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.CertPemFile, err),
}
Expand All @@ -92,7 +129,7 @@ func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Ex
} else if ocac.UseSecure {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, nil, &ocTraceExporterError{
return nil, &ocTraceExporterError{
code: errUnableToGetTLSCreds,
msg: fmt.Sprintf(
"OpenCensus exporter unable to read certificates from system pool: %v", err),
Expand All @@ -116,33 +153,7 @@ func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Ex
PermitWithoutStream: ocac.KeepaliveParameters.PermitWithoutStream,
})))
}

numWorkers := defaultNumWorkers
if ocac.NumWorkers > 0 {
numWorkers = ocac.NumWorkers
}

exportersChan := make(chan *ocagent.Exporter, numWorkers)
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
exporter, serr := ocagent.NewExporter(opts...)
if serr != nil {
return nil, nil, fmt.Errorf("cannot configure OpenCensus Trace exporter: %v", serr)
}
exportersChan <- exporter
}

oce := &ocagentExporter{exporters: exportersChan}
oexp, err := exporterhelper.NewTraceExporter(
"oc_trace",
oce.PushTraceData,
exporterhelper.WithSpanName("ocservice.exporter.OpenCensus.ConsumeTraceData"),
exporterhelper.WithRecordMetrics(true))

if err != nil {
return nil, nil, err
}

return oexp, oce.stop, nil
return opts, nil
}

// CreateMetricsExporter creates a metrics exporter based on this config.
Expand Down
4 changes: 2 additions & 2 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/compression"
"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-service/internal/compression"
"github.com/open-telemetry/opentelemetry-service/internal/testutils"
"github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
"github.com/open-telemetry/opentelemetry-service/receiver/receivertest"
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCreateTraceExporter(t *testing.T) {
name: "KeepaliveParameters",
config: Config{
Endpoint: rcvCfg.Endpoint,
KeepaliveParameters: &keepaliveConfig{
KeepaliveParameters: &KeepaliveConfig{
Time: 30 * time.Second,
Timeout: 25 * time.Second,
PermitWithoutStream: true,
Expand Down
10 changes: 5 additions & 5 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/open-telemetry/opentelemetry-service/compression"
compressiongrpc "github.com/open-telemetry/opentelemetry-service/compression/grpc"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/exporter/exporterhelper"
"github.com/open-telemetry/opentelemetry-service/internal/compression"
compressiongrpc "github.com/open-telemetry/opentelemetry-service/internal/compression/grpc"
"github.com/open-telemetry/opentelemetry-service/oterr"
)

// keepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter.
// KeepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter.
// Refer to the original data-structure for the meaning of each parameter.
type keepaliveConfig struct {
type KeepaliveConfig struct {
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"`
Expand All @@ -52,7 +52,7 @@ type opencensusConfig struct {
CertPemFile string `mapstructure:"cert-pem-file,omitempty"`
UseSecure bool `mapstructure:"secure,omitempty"`
ReconnectionDelay time.Duration `mapstructure:"reconnection-delay,omitempty"`
KeepaliveParameters *keepaliveConfig `mapstructure:"keepalive,omitempty"`
KeepaliveParameters *KeepaliveConfig `mapstructure:"keepalive,omitempty"`
// TODO: service name options.
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/opencensusexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestOpenCensusTraceExporterConfigsViaViper(t *testing.T) {
},
want: opencensusConfig{
Endpoint: defaultTestEndPoint,
KeepaliveParameters: &keepaliveConfig{
KeepaliveParameters: &KeepaliveConfig{
Time: 30 * time.Second,
Timeout: 25 * time.Second,
PermitWithoutStream: true,
Expand Down
2 changes: 1 addition & 1 deletion receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package receiver
import (
"context"

_ "github.com/open-telemetry/opentelemetry-service/internal/compression/grpc" // load in supported grpc compression encodings
_ "github.com/open-telemetry/opentelemetry-service/compression/grpc" // load in supported grpc compression encodings
)

// Host represents the entity where the receiver is being hosted. It is used to
Expand Down
3 changes: 3 additions & 0 deletions testbed/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948 h1
contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948/go.mod h1:ukdzwIYYHgZ7QYtwVFQUjiT28BJHiMhTERo32s6qVgM=
contrib.go.opencensus.io/exporter/ocagent v0.5.0 h1:TKXjQSRS0/cCDrP7KvkgU6SmILtF/yV2TOs/02K/WZQ=
contrib.go.opencensus.io/exporter/ocagent v0.5.0/go.mod h1:ImxhfLRpxoYiSq891pBrLVhN+qmP8BTVvdH2YLs7Gl0=
contrib.go.opencensus.io/exporter/ocagent v0.5.1/go.mod h1:oGSyf701BHqn69lMacwJJuyGzrk5eiCj86DxXhG2gfk=
contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A=
contrib.go.opencensus.io/exporter/zipkin v0.1.1/go.mod h1:GMvdSl3eJ2gapOaLKzTKE3qDgUkJ86k9k3yY2eqwkzc=
contrib.go.opencensus.io/resource v0.1.1/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
Expand Down Expand Up @@ -37,6 +38,7 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
github.com/cenk/backoff v2.0.0+incompatible/go.mod h1:7FtoeaSnHoZnmZzz47cM35Y9nSW7tNyaidugnHTaFDE=
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20180905225744-ee1a9a0726d2/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down Expand Up @@ -95,6 +97,7 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20160529050041-d9eb7a3d35ec/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/addlicense v0.0.0-20190510175307-22550fa7c1b0/go.mod h1:QtPG26W17m+OIQgE6gQ24gC1M6pUaMBAbFrTIDtwG/E=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand Down