Skip to content

Commit

Permalink
[receiver/skywalking] Create skywalking component folder/structure (#…
Browse files Browse the repository at this point in the history
…8107)

* remove usage of Deprecated LogRecord.Name field.

* first PR for skywaling/tracing receiver

Signed-off-by: jian.tan <[email protected]>

* add change log

Signed-off-by: jian.tan <[email protected]>

* fix go mod

* fix go mod

Signed-off-by: jian.tan <[email protected]>

* fix goimports order

Signed-off-by: jian.tan <[email protected]>

* fix build

* fix build

* add sum

Signed-off-by: jian.tan <[email protected]>

* polish

Signed-off-by: jian.tan <[email protected]>

* update go sum

Signed-off-by: jian.tan <[email protected]>

* polish go mod.

Signed-off-by: jian.tan <[email protected]>

* make goporto

Signed-off-by: jian.tan <[email protected]>

* polish

Signed-off-by: jian.tan <[email protected]>

* polish

Signed-off-by: jian.tan <[email protected]>

* fix lint

Signed-off-by: jian.tan <[email protected]>

* Update factory.go

* fix lint

Signed-off-by: jian.tan <[email protected]>

* fix fmt

Signed-off-by: jian.tan <[email protected]>

* fix unit test

Signed-off-by: jian.tan <[email protected]>

* fix typo

Signed-off-by: jian.tan <[email protected]>

* Update receiver/skywalkingreceiver/config.go

Co-authored-by: Przemek Maciolek <[email protected]>

* Update receiver/skywalkingreceiver/trace_receiver.go

Co-authored-by: Przemek Maciolek <[email protected]>

* Update receiver/skywalkingreceiver/trace_receiver.go

Co-authored-by: Przemek Maciolek <[email protected]>

* Update receiver/skywalkingreceiver/factory_test.go

Co-authored-by: Przemek Maciolek <[email protected]>

* Update config.yaml

* Update receiver/skywalkingreceiver/trace_receiver.go

Co-authored-by: Przemek Maciolek <[email protected]>

* add more UT for trace receiver.

Signed-off-by: jian.tan <[email protected]>

* fix fmt and lint

Signed-off-by: jian.tan <[email protected]>

* add version.

Signed-off-by: jian.tan <[email protected]>

* add com

Signed-off-by: jian.tan <[email protected]>

* fix com

Signed-off-by: jian.tan <[email protected]>

Co-authored-by: Przemek Maciolek <[email protected]>
  • Loading branch information
JaredTan95 and pmm-sumo authored Mar 2, 2022
1 parent 66f91ea commit 91a96cd
Show file tree
Hide file tree
Showing 20 changed files with 1,866 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- `resourcedetectionprocessor`: Add confighttp.HTTPClientSettings To Resource Detection Config Fixes (#7397)
- `honeycombexporter`: Add validation for `sending_queue` setting (#8113)
- `routingprocessor`: Expand error handling on failure to build exporters (#8125)
- `skywalkingreceiver`: Add new skywalking receiver component folder and structure (#8107)

### 🛑 Breaking changes 🛑

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signa

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver => ./receiver/simpleprometheusreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver => ./receiver/skywalkingreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver => ./receiver/splunkhecreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver => ./receiver/statsdreceiver
Expand Down
1 change: 1 addition & 0 deletions receiver/skywalkingreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
36 changes: 36 additions & 0 deletions receiver/skywalkingreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Skywalking Receiver

Receives trace data in [Skywalking](https://skywalking.apache.org/) format.

Supported pipeline types: traces

## ⚠️ Warning

Note: This component is currently work in progress, and traces receiver is not yet fully functional.

## Getting Started

By default, the Skywalking receiver will not serve any protocol. A protocol must be
named under the `protocols` object for the Skywalking receiver to start. The
below protocols are supported, each supports an optional `endpoint`
object configuration parameter.

- `grpc` (default `endpoint` = 0.0.0.0:11800)
- `http` (default `endpoint` = 0.0.0.0:12800)

Examples:

```yaml
receivers:
skywalking:
protocols:
grpc:
endpoint: 0.0.0.0:11800
http:
endpoint: 0.0.0.0:12800

service:
pipelines:
traces:
receivers: [skywalking]
```
94 changes: 94 additions & 0 deletions receiver/skywalkingreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"

import (
"fmt"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
)

const (
// The config field id to load the protocol map from
protocolsFieldName = "protocols"
)

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
}

// Config defines configuration for skywalking receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Protocols `mapstructure:"protocols"`
}

var _ config.Receiver = (*Config)(nil)
var _ config.Unmarshallable = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.GRPC == nil && cfg.HTTP == nil {
return fmt.Errorf("must specify at least one protocol when using the Skywalking receiver")
}

if cfg.GRPC != nil {
var err error
if _, err = extractPortFromEndpoint(cfg.GRPC.NetAddr.Endpoint); err != nil {
return fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
}
}

if cfg.HTTP != nil {
if _, err := extractPortFromEndpoint(cfg.HTTP.Endpoint); err != nil {
return fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

return nil
}

// Unmarshal a config.Parser into the config struct.
func (cfg *Config) Unmarshal(componentParser *config.Map) error {
if componentParser == nil || len(componentParser.AllKeys()) == 0 {
return fmt.Errorf("empty config for Skywalking receiver")
}

// UnmarshalExact will not set struct properties to nil even if no key is provided,
// so set the protocol structs to nil where the keys were omitted.
err := componentParser.UnmarshalExact(cfg)
if err != nil {
return err
}

protocols, err := componentParser.Sub(protocolsFieldName)
if err != nil {
return err
}

if !protocols.IsSet(protoGRPC) {
cfg.GRPC = nil
}

if !protocols.IsSet(protoHTTP) {
cfg.HTTP = nil
}

return nil
}
68 changes: 68 additions & 0 deletions receiver/skywalkingreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 3)

r1 := cfg.Receivers[config.NewComponentIDWithName(typeStr, "customname")].(*Config)
assert.Equal(t, r1,
&Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentIDWithName(typeStr, "customname")),
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:12801",
},
},
})

rDefaults := cfg.Receivers[config.NewComponentIDWithName(typeStr, "defaults")].(*Config)
assert.Equal(t, rDefaults,
&Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentIDWithName(typeStr, "defaults")),
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
},
})
}
120 changes: 120 additions & 0 deletions receiver/skywalkingreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"

// This file implements factory for skywalking receiver.

import (
"context"
"fmt"
"net"
"strconv"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

const (
typeStr = "skywalking"

// Protocol values.
protoGRPC = "grpc"
protoHTTP = "http"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:11800"
defaultHTTPBindEndpoint = "0.0.0.0:12800"
)

// NewFactory creates a new Skywalking receiver factory.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(createTracesReceiver))
}

// CreateDefaultConfig creates the default configuration for Skywalking receiver.
func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
},
}
}

// createTracesReceiver creates a trace receiver based on provided config.
func createTracesReceiver(
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {

// Convert settings in the source c to configuration struct
// that Skywalking receiver understands.
rCfg := cfg.(*Config)

var err error
var c configuration
// Set ports
if rCfg.Protocols.GRPC != nil {
c.CollectorGRPCServerSettings = *rCfg.Protocols.GRPC
if c.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.NetAddr.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
}
}

if rCfg.Protocols.HTTP != nil {
c.CollectorHTTPSettings = *rCfg.Protocols.HTTP
if c.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.HTTP.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

// Create the receiver.
return newSkywalkingReceiver(rCfg.ID(), &c, nextConsumer, set), nil
}

// extract the port number from string in "address:port" format. If the
// port number cannot be extracted returns an error.
func extractPortFromEndpoint(endpoint string) (int, error) {
_, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return 0, fmt.Errorf("endpoint is not formatted correctly: %s", err.Error())
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return 0, fmt.Errorf("endpoint port is not a number: %s", err.Error())
}
if port < 1 || port > 65535 {
return 0, fmt.Errorf("port number must be between 1 and 65535")
}
return int(port), nil
}
Loading

0 comments on commit 91a96cd

Please sign in to comment.