Skip to content

Commit

Permalink
output/cloudv2: Implement RequestMetadata output (#3100)
Browse files Browse the repository at this point in the history
It adds support for flushing the tracing metadata to the related Cloud's remote gRPC service. 

---------

Co-authored-by: Ivan <[email protected]>
  • Loading branch information
Blinkuu and codebien authored Jun 23, 2023
1 parent 9fdd569 commit 97d40e9
Show file tree
Hide file tree
Showing 29 changed files with 4,051 additions and 9 deletions.
34 changes: 29 additions & 5 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/mstoykov/envconfig"

"go.k6.io/k6/lib/types"
)

Expand Down Expand Up @@ -40,6 +41,15 @@ type Config struct {
// This is how many concurrent pushes will be done at the same time to the cloud
MetricPushConcurrency null.Int `json:"metricPushConcurrency" envconfig:"K6_CLOUD_METRIC_PUSH_CONCURRENCY"`

// Indicates whether to send traces to the k6 Insights backend service.
TracesEnabled null.Bool `json:"tracesEnabled" envconfig:"K6_CLOUD_TRACES_ENABLED"`

// The host of the k6 Insights backend service.
TracesHost null.String `json:"traceHost" envconfig:"K6_CLOUD_TRACES_HOST"`

// The time interval between periodic API calls for sending samples to the cloud ingest service.
TracesPushInterval types.NullDuration `json:"tracesPushInterval" envconfig:"K6_CLOUD_TRACES_PUSH_INTERVAL"`

// Aggregation docs:
//
// If AggregationPeriod is specified and if it is greater than 0, HTTP metric aggregation
Expand Down Expand Up @@ -145,11 +155,16 @@ type Config struct {
// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Host: null.NewString("https://ingest.k6.io", false),
LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false),
WebAppURL: null.NewString("https://app.k6.io", false),
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),
Host: null.NewString("https://ingest.k6.io", false),
LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false),
WebAppURL: null.NewString("https://app.k6.io", false),
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),

TracesEnabled: null.NewBool(false, false),
TracesHost: null.NewString("insights.k6.io:4443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),

MaxMetricSamplesPerPackage: null.NewInt(100000, false),
Timeout: types.NewNullDuration(1*time.Minute, false),
APIVersion: null.NewInt(1, false),
Expand Down Expand Up @@ -216,6 +231,15 @@ func (c Config) Apply(cfg Config) Config {
if cfg.MetricPushConcurrency.Valid {
c.MetricPushConcurrency = cfg.MetricPushConcurrency
}
if cfg.TracesEnabled.Valid {
c.TracesEnabled = cfg.TracesEnabled
}
if cfg.TracesHost.Valid {
c.TracesHost = cfg.TracesHost
}
if cfg.TracesPushInterval.Valid {
c.TracesPushInterval = cfg.TracesPushInterval
}
if cfg.AggregationPeriod.Valid {
c.AggregationPeriod = cfg.AggregationPeriod
}
Expand Down
5 changes: 4 additions & 1 deletion cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ func TestConfigApply(t *testing.T) {
ProjectID: null.NewInt(1, true),
Name: null.NewString("Name", true),
Host: null.NewString("Host", true),
Timeout: types.NewNullDuration(5*time.Second, true),
LogsTailURL: null.NewString("LogsTailURL", true),
PushRefID: null.NewString("PushRefID", true),
WebAppURL: null.NewString("foo", true),
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
Timeout: types.NewNullDuration(5*time.Second, true),
APIVersion: null.NewInt(2, true),
MaxMetricSamplesPerPackage: null.NewInt(2, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
TracesEnabled: null.NewBool(true, true),
TracesHost: null.NewString("TracesHost", true),
TracesPushInterval: types.NewNullDuration(1*time.Second, true),
AggregationPeriod: types.NewNullDuration(2*time.Second, true),
AggregationCalcInterval: types.NewNullDuration(3*time.Second, true),
AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true),
Expand Down
207 changes: 207 additions & 0 deletions cloudapi/insights/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package insights

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.k6.io/k6/cloudapi/insights/proto/v1/ingester"
)

const (
testRunIDHeader = "X-K6TestRun-Id"
authorizationHeader = "Authorization"
)

var (
// ErrClientAlreadyInitialized is returned when the client is already initialized.
ErrClientAlreadyInitialized = errors.New("insights client already initialized")

// ErrClientClosed is returned when the client is closed.
ErrClientClosed = errors.New("insights client closed")
)

// ClientConfig is the configuration for the client.
type ClientConfig struct {
IngesterHost string
ConnectConfig ClientConnectConfig
AuthConfig ClientAuthConfig
TLSConfig ClientTLSConfig
}

// ClientConnectConfig is the configuration for the client connection.
type ClientConnectConfig struct {
Block bool
FailOnNonTempDialError bool
Dialer func(context.Context, string) (net.Conn, error)
}

// ClientAuthConfig is the configuration for the client authentication.
type ClientAuthConfig struct {
Enabled bool
TestRunID int64
Token string
RequireTransportSecurity bool
}

// ClientTLSConfig is the configuration for the client TLS.
type ClientTLSConfig struct {
Insecure bool
CertFile string
}

// Client is the client for the k6 Insights ingester service.
type Client struct {
cfg ClientConfig
client ingester.IngesterServiceClient
conn *grpc.ClientConn
connMu *sync.RWMutex
}

// NewClient creates a new client.
func NewClient(cfg ClientConfig) *Client {
return &Client{
cfg: cfg,
client: nil,
conn: nil,
connMu: &sync.RWMutex{},
}
}

// Dial creates a client connection using ClientConfig.
func (c *Client) Dial(ctx context.Context) error {
c.connMu.Lock()
defer c.connMu.Unlock()

if c.conn != nil {
return ErrClientAlreadyInitialized
}

opts, err := dialOptionsFromClientConfig(c.cfg)
if err != nil {
return fmt.Errorf("failed to create dial options: %w", err)
}

conn, err := grpc.DialContext(ctx, c.cfg.IngesterHost, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}

c.client = ingester.NewIngesterServiceClient(conn)
c.conn = conn

return nil
}

// IngestRequestMetadatasBatch ingests a batch of request metadatas.
func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadatas RequestMetadatas) error {
c.connMu.RLock()
closed := c.conn == nil
c.connMu.RUnlock()
if closed {
return ErrClientClosed
}

if len(requestMetadatas) < 1 {
return nil
}

req, err := newBatchCreateRequestMetadatasRequest(requestMetadatas)
if err != nil {
return fmt.Errorf("failed to create request from request metadatas: %w", err)
}

// TODO(lukasz, retry-support): Retry request with returned metadatas.
//
// Note: There is currently no backend support backing up this retry mechanism.
_, err = c.client.BatchCreateRequestMetadatas(ctx, req)
if err != nil {
st := status.Convert(err)
return fmt.Errorf("failed to ingest request metadatas batch: code=%s, msg=%s", st.Code().String(), st.Message())
}

return nil
}

// Close closes the client.
func (c *Client) Close() error {
c.connMu.Lock()
defer c.connMu.Unlock()

if c.conn == nil {
return ErrClientClosed
}

conn := c.conn
c.client = nil
c.conn = nil

return conn.Close()
}

func dialOptionsFromClientConfig(cfg ClientConfig) ([]grpc.DialOption, error) {
var opts []grpc.DialOption

if cfg.ConnectConfig.Block {
opts = append(opts, grpc.WithBlock())
}

if cfg.ConnectConfig.FailOnNonTempDialError {
opts = append(opts, grpc.FailOnNonTempDialError(true))
}

if cfg.ConnectConfig.Dialer != nil {
opts = append(opts, grpc.WithContextDialer(cfg.ConnectConfig.Dialer))
}

if cfg.TLSConfig.Insecure { //nolint: nestif
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
if cfg.TLSConfig.CertFile != "" {
creds, err := credentials.NewClientTLSFromFile(cfg.TLSConfig.CertFile, "")
if err != nil {
return nil, fmt.Errorf("failed to load TLS credentials from file: %w", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS13})))
}
}

if cfg.AuthConfig.Enabled {
opts = append(opts, grpc.WithPerRPCCredentials(newPerRPCCredentials(cfg.AuthConfig)))
}

return opts, nil
}

type perRPCCredentials struct {
metadata map[string]string
requireTransportSecurity bool
}

func newPerRPCCredentials(cfg ClientAuthConfig) perRPCCredentials {
return perRPCCredentials{
metadata: map[string]string{
testRunIDHeader: fmt.Sprintf("%d", cfg.TestRunID),
authorizationHeader: fmt.Sprintf("Token %s", cfg.Token),
},
requireTransportSecurity: cfg.RequireTransportSecurity,
}
}

func (c perRPCCredentials) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
return c.metadata, nil
}

func (c perRPCCredentials) RequireTransportSecurity() bool {
return c.requireTransportSecurity
}
Loading

0 comments on commit 97d40e9

Please sign in to comment.