Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/skywalking] Create skywalking component folder/structure #8107

Merged
merged 40 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3621312
remove usage of Deprecated LogRecord.Name field.
JaredTan95 Feb 11, 2022
e262fd7
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 13, 2022
50eed02
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 16, 2022
5d9b46d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 17, 2022
4146721
first PR for skywaling/tracing receiver
JaredTan95 Feb 23, 2022
3f41a22
Merge branch 'main' into sw_reciever
JaredTan95 Feb 23, 2022
000d0ef
add change log
JaredTan95 Feb 23, 2022
b6416ef
fix go mod
JaredTan95 Feb 23, 2022
c29e2df
fix go mod
JaredTan95 Feb 23, 2022
7e171cf
fix goimports order
JaredTan95 Feb 23, 2022
722be02
fix build
JaredTan95 Feb 23, 2022
f5b6cc8
fix build
JaredTan95 Feb 23, 2022
2beb6c4
add sum
JaredTan95 Feb 23, 2022
c1f948c
polish
JaredTan95 Feb 23, 2022
f275ee4
update go sum
JaredTan95 Feb 23, 2022
137b3ab
polish go mod.
JaredTan95 Feb 23, 2022
05adcd1
make goporto
JaredTan95 Feb 23, 2022
3908cb3
polish
JaredTan95 Feb 23, 2022
2973c4a
polish
JaredTan95 Feb 24, 2022
208d01c
Merge branch 'main' into sw_reciever
JaredTan95 Feb 24, 2022
ff8a324
fix lint
JaredTan95 Feb 24, 2022
1147f04
Merge branch 'sw_reciever' of https://github.com/JaredTan95/opentelem…
JaredTan95 Feb 24, 2022
ce48c7a
Update factory.go
JaredTan95 Feb 24, 2022
22f779c
fix lint
JaredTan95 Feb 24, 2022
a3086fd
fix fmt
JaredTan95 Feb 24, 2022
1b78e25
fix unit test
JaredTan95 Feb 24, 2022
d802117
fix typo
JaredTan95 Feb 24, 2022
75602c8
Update receiver/skywalkingreceiver/config.go
JaredTan95 Feb 24, 2022
e1851a8
Update receiver/skywalkingreceiver/trace_receiver.go
JaredTan95 Feb 24, 2022
46999e6
Update receiver/skywalkingreceiver/trace_receiver.go
JaredTan95 Feb 24, 2022
3bbac8d
Update receiver/skywalkingreceiver/factory_test.go
JaredTan95 Feb 24, 2022
0e64519
Update config.yaml
JaredTan95 Feb 24, 2022
7250215
Update receiver/skywalkingreceiver/trace_receiver.go
JaredTan95 Feb 24, 2022
6302a13
Merge branch 'main' into sw_reciever
JaredTan95 Feb 26, 2022
0389cd1
Merge branch 'main' into sw_reciever
JaredTan95 Mar 1, 2022
b66a00c
add more UT for trace receiver.
JaredTan95 Mar 2, 2022
7390a69
fix fmt and lint
JaredTan95 Mar 2, 2022
6e52c83
add version.
JaredTan95 Mar 2, 2022
fe680d1
add com
JaredTan95 Mar 2, 2022
255bf24
fix com
JaredTan95 Mar 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- `resourcedetectionprocessor`: Add confighttp.HTTPClientSettings To Resource Detection Config Fixes (#7397)
- `honeycombexporter`: Add validation for `sending_queue` setting (#8113)
- `routingprocessor`: Expand error handling on failure to build exporters (#8125)
- `skywalkingreceiver`: Add new skywalking receiver component folder and structure (#8107)

### 🛑 Breaking changes 🛑

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signa

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver => ./receiver/simpleprometheusreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver => ./receiver/skywalkingreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver => ./receiver/splunkhecreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver => ./receiver/statsdreceiver
Expand Down
1 change: 1 addition & 0 deletions receiver/skywalkingreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
36 changes: 36 additions & 0 deletions receiver/skywalkingreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Skywalking Receiver

Receives trace data in [Skywalking](https://skywalking.apache.org/) format.

Supported pipeline types: traces
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved

## ⚠️ Warning

Note: This component is currently work in progress, and traces receiver is not yet fully functional.

## Getting Started
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved

By default, the Skywalking receiver will not serve any protocol. A protocol must be
named under the `protocols` object for the Skywalking receiver to start. The
below protocols are supported, each supports an optional `endpoint`
object configuration parameter.

- `grpc` (default `endpoint` = 0.0.0.0:11800)
- `http` (default `endpoint` = 0.0.0.0:12800)

Examples:

```yaml
receivers:
skywalking:
protocols:
grpc:
endpoint: 0.0.0.0:11800
http:
endpoint: 0.0.0.0:12800

service:
pipelines:
traces:
receivers: [skywalking]
```
94 changes: 94 additions & 0 deletions receiver/skywalkingreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"

import (
"fmt"

"go.opentelemetry.io/collector/config"
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
)

const (
// The config field id to load the protocol map from
protocolsFieldName = "protocols"
)

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
}

// Config defines configuration for skywalking receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Protocols `mapstructure:"protocols"`
}

var _ config.Receiver = (*Config)(nil)
var _ config.Unmarshallable = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.GRPC == nil && cfg.HTTP == nil {
return fmt.Errorf("must specify at least one protocol when using the Skywalking receiver")
}

JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
if cfg.GRPC != nil {
var err error
if _, err = extractPortFromEndpoint(cfg.GRPC.NetAddr.Endpoint); err != nil {
return fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
}
}

if cfg.HTTP != nil {
if _, err := extractPortFromEndpoint(cfg.HTTP.Endpoint); err != nil {
return fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

return nil
}

// Unmarshal a config.Parser into the config struct.
func (cfg *Config) Unmarshal(componentParser *config.Map) error {
if componentParser == nil || len(componentParser.AllKeys()) == 0 {
return fmt.Errorf("empty config for Skywalking receiver")
}

// UnmarshalExact will not set struct properties to nil even if no key is provided,
// so set the protocol structs to nil where the keys were omitted.
err := componentParser.UnmarshalExact(cfg)
if err != nil {
return err
}

protocols, err := componentParser.Sub(protocolsFieldName)
if err != nil {
return err
}

if !protocols.IsSet(protoGRPC) {
cfg.GRPC = nil
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
}

if !protocols.IsSet(protoHTTP) {
cfg.HTTP = nil
}

return nil
}
68 changes: 68 additions & 0 deletions receiver/skywalkingreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 3)

r1 := cfg.Receivers[config.NewComponentIDWithName(typeStr, "customname")].(*Config)
assert.Equal(t, r1,
&Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentIDWithName(typeStr, "customname")),
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:12801",
},
},
})

rDefaults := cfg.Receivers[config.NewComponentIDWithName(typeStr, "defaults")].(*Config)
assert.Equal(t, rDefaults,
&Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentIDWithName(typeStr, "defaults")),
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
},
})
}
120 changes: 120 additions & 0 deletions receiver/skywalkingreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"

// This file implements factory for skywalking receiver.

import (
"context"
"fmt"
"net"
"strconv"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

const (
typeStr = "skywalking"

// Protocol values.
protoGRPC = "grpc"
protoHTTP = "http"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:11800"
defaultHTTPBindEndpoint = "0.0.0.0:12800"
)

// NewFactory creates a new Skywalking receiver factory.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(createTracesReceiver))
}

// CreateDefaultConfig creates the default configuration for Skywalking receiver.
func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
},
}
}

// createTracesReceiver creates a trace receiver based on provided config.
func createTracesReceiver(
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {

// Convert settings in the source c to configuration struct
// that Skywalking receiver understands.
rCfg := cfg.(*Config)

var err error
var c configuration
// Set ports
if rCfg.Protocols.GRPC != nil {
c.CollectorGRPCServerSettings = *rCfg.Protocols.GRPC
if c.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.NetAddr.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the gRPC endpoint: %w", err)
}
}

if rCfg.Protocols.HTTP != nil {
c.CollectorHTTPSettings = *rCfg.Protocols.HTTP
if c.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.HTTP.Endpoint); err != nil {
return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

// Create the receiver.
return newSkywalkingReceiver(rCfg.ID(), &c, nextConsumer, set), nil
}

// extract the port number from string in "address:port" format. If the
// port number cannot be extracted returns an error.
func extractPortFromEndpoint(endpoint string) (int, error) {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
_, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return 0, fmt.Errorf("endpoint is not formatted correctly: %s", err.Error())
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return 0, fmt.Errorf("endpoint port is not a number: %s", err.Error())
}
if port < 1 || port > 65535 {
return 0, fmt.Errorf("port number must be between 1 and 65535")
}
return int(port), nil
}
Loading