Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Add remote sampling extension #432

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/extension"
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector/extension/pprofextension"
"github.com/open-telemetry/opentelemetry-collector/extension/remotesamplingextension"
"github.com/open-telemetry/opentelemetry-collector/extension/zpagesextension"
"github.com/open-telemetry/opentelemetry-collector/oterr"
"github.com/open-telemetry/opentelemetry-collector/processor"
Expand Down Expand Up @@ -55,6 +56,7 @@ func Components() (
&healthcheckextension.Factory{},
&pprofextension.Factory{},
&zpagesextension.Factory{},
&remotesamplingextension.Factory{},
)
if err != nil {
errs = append(errs, err)
Expand Down
8 changes: 5 additions & 3 deletions defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/extension"
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector/extension/pprofextension"
"github.com/open-telemetry/opentelemetry-collector/extension/remotesamplingextension"
"github.com/open-telemetry/opentelemetry-collector/extension/zpagesextension"
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
Expand All @@ -49,9 +50,10 @@ import (

func TestDefaultComponents(t *testing.T) {
expectedExtensions := map[string]extension.Factory{
"health_check": &healthcheckextension.Factory{},
"pprof": &pprofextension.Factory{},
"zpages": &zpagesextension.Factory{},
"health_check": &healthcheckextension.Factory{},
"pprof": &pprofextension.Factory{},
"zpages": &zpagesextension.Factory{},
"remotesampling": &remotesamplingextension.Factory{},
}
expectedReceivers := map[string]receiver.Factory{
"jaeger": &jaegerreceiver.Factory{},
Expand Down
35 changes: 35 additions & 0 deletions extension/remotesamplingextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remotesamplingextension

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// Config has the configuration settings for the remote sampling extension,
// used to fetch sampling configuration from the upstream Jaeger collector instance.
//
// It is an extension of configmodels.ExtensionSettings
type Config struct {
configmodels.ExtensionSettings `mapstructure:",squash"`

// Port is the port used to publish the health check status.
// Default is `5778` (https://www.jaegertracing.io/docs/1.15/deployment/#agent).
Port uint16 `mapstructure:"port"`

// Addr is the upstream Jaeger collector address that can be used to fetch
// sampling configurations. The default value is `:14250`.
Addr string `mapstructure:"addr"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this field be more descriptive to help the operator know what it's for. CollectorAddress?

}
56 changes: 56 additions & 0 deletions extension/remotesamplingextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remotesamplingextension

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factories.Extensions[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.Nil(t, err)
require.NotNil(t, cfg)

ext0 := cfg.Extensions["remotesampling"]
assert.Equal(t, factory.CreateDefaultConfig(), ext0)

ext1 := cfg.Extensions["remotesampling/1"]
assert.Equal(t,
&Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: "remotesampling",
NameVal: "remotesampling/1",
},
Port: 5779,
Addr: "0.0.0.0:14251",
},
ext1)

assert.Equal(t, 1, len(cfg.Service.Extensions))
assert.Equal(t, "remotesampling/1", cfg.Service.Extensions[0])
}
17 changes: 17 additions & 0 deletions extension/remotesamplingextension/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package remotesamplingextension implements an extension that serves as a proxy
// and routes client requests for sampling config to the Jaeger collector.
package remotesamplingextension
88 changes: 88 additions & 0 deletions extension/remotesamplingextension/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remotesamplingextension

import (
"errors"
"sync/atomic"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/extension"
)

const (
// The value of extension "type" in configuration.
typeStr = "remotesampling"
)

// Factory is the factory for the extension.
type Factory struct {
}

var _ (extension.Factory) = (*Factory)(nil)

// Type gets the type of the config created by this factory.
func (f *Factory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for the extension.
func (f *Factory) CreateDefaultConfig() configmodels.Extension {
return &Config{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Port: 5778,
Addr: "0.0.0.0:14250",
Copy link
Contributor

@joe-elliott joe-elliott Nov 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a meaningful default for this field? Maybe it should just default to empty string?

}
}

// CreateExtension creates the extension based on this config.
func (f *Factory) CreateExtension(
logger *zap.Logger,
cfg configmodels.Extension,
) (extension.ServiceExtension, error) {
config := cfg.(*Config)
if config.Addr == "" {
return nil, errors.New("\"Addr\" is required when using the \"remotesampling\" extension")
}
if config.Port == 0 {
return nil, errors.New("\"Port\" is required when using the \"remotesampling\" extension")
}

// The runtime settings are global to the application, so while in principle it
// is possible to have more than one instance, running multiple will mean that
// the settings of the last started instance will prevail. In order to avoid
// this issue we will allow the creation of a single instance once per process
// while keeping the private function that allow the creation of multiple
// instances for unit tests. Summary: only a single instance can be created
// via the factory.
if !atomic.CompareAndSwapInt32(&instanceState, instanceNotCreated, instanceCreated) {
return nil, errors.New("only a single instance can be created per process")
}

return newServer(*config, logger)
}

// See comment in CreateExtension how these are used.
var instanceState int32

const (
instanceNotCreated int32 = 0
instanceCreated int32 = 1
)
83 changes: 83 additions & 0 deletions extension/remotesamplingextension/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remotesamplingextension

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

func TestFactory_Type(t *testing.T) {
factory := Factory{}
require.Equal(t, typeStr, factory.Type())
}

func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
assert.Equal(t, &Config{
ExtensionSettings: configmodels.ExtensionSettings{
NameVal: typeStr,
TypeVal: typeStr,
},
Port: 5778,
Addr: "0.0.0.0:14250",
},
cfg)

assert.NoError(t, configcheck.ValidateConfig(cfg))
ext, err := factory.CreateExtension(zap.NewNop(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtension(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig().(*Config)

ext, err := factory.CreateExtension(zap.NewNop(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtensionOnlyOnce(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig().(*Config)

logger := zap.NewNop()
ext, err := factory.CreateExtension(logger, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

ext1, err := factory.CreateExtension(logger, cfg)
require.Error(t, err)
require.Nil(t, ext1)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}
Loading