Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k6 Insights (2/2): Integrate request metadata output to cloud output v1 #3202

Merged
merged 5 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions cloudapi/insights/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.k6.io/k6/lib/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -37,7 +36,7 @@ var (
// ClientConfig is the configuration for the client.
type ClientConfig struct {
IngesterHost string
Timeout types.NullDuration
Timeout time.Duration
ConnectConfig ClientConnectConfig
AuthConfig ClientAuthConfig
TLSConfig ClientTLSConfig
Expand All @@ -48,6 +47,7 @@ type ClientConfig struct {
type ClientConnectConfig struct {
Block bool
FailOnNonTempDialError bool
Timeout time.Duration
Dialer func(context.Context, string) (net.Conn, error)
}

Expand Down Expand Up @@ -88,6 +88,39 @@ type Client struct {
connMu *sync.RWMutex
}

// NewDefaultClientConfigForTestRun creates a new default client config for a test run.
func NewDefaultClientConfigForTestRun(ingesterHost, authToken string, testRunID int64) ClientConfig {
return ClientConfig{
IngesterHost: ingesterHost,
Timeout: 90 * time.Second,
ConnectConfig: ClientConnectConfig{
Block: false,
FailOnNonTempDialError: false,
Timeout: 10 * time.Second,
Dialer: nil,
},
AuthConfig: ClientAuthConfig{
Enabled: true,
TestRunID: testRunID,
Token: authToken,
RequireTransportSecurity: true,
},
TLSConfig: ClientTLSConfig{
Insecure: false,
},
RetryConfig: ClientRetryConfig{
RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`,
MaxAttempts: 3,
PerRetryTimeout: 30 * time.Second,
BackoffConfig: ClientBackoffConfig{
Enabled: true,
JitterFraction: 0.1,
WaitBetween: 1 * time.Second,
},
},
}
}

// NewClient creates a new client.
func NewClient(cfg ClientConfig) *Client {
return &Client{
Expand All @@ -112,6 +145,8 @@ func (c *Client) Dial(ctx context.Context) error {
return fmt.Errorf("failed to create dial options: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.ConnectConfig.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, c.cfg.IngesterHost, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
Expand All @@ -132,9 +167,6 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return ErrClientClosed
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout.TimeDuration())
defer cancel()

if len(requestMetadatas) < 1 {
return nil
}
Expand All @@ -144,6 +176,8 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return fmt.Errorf("failed to create request from request metadatas: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
_, err = c.client.BatchCreateRequestMetadatas(ctx, req)
if err != nil {
st := status.Convert(err)
Expand Down
44 changes: 22 additions & 22 deletions cloudapi/insights/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.k6.io/k6/lib/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -92,8 +91,8 @@ func TestClient_Dial_ReturnsNoErrorWithWorkingDialer(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -114,8 +113,8 @@ func TestClient_Dial_ReturnsErrorWhenCalledTwice(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -138,6 +137,7 @@ func TestClient_Dial_ReturnsNoErrorWithFailingDialer(t *testing.T) {
ConnectConfig: ClientConnectConfig{
Block: true,
FailOnNonTempDialError: true,
Timeout: 1 * time.Second,
Dialer: func(ctx context.Context, s string) (net.Conn, error) {
return nil, &fatalError{}
},
Expand All @@ -163,7 +163,7 @@ func TestClient_Dial_ReturnsErrorWithoutRetryableStatusCodes(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
}
Expand All @@ -184,7 +184,7 @@ func TestClient_Dial_ReturnsErrorWithInvalidRetryableStatusCodes(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: "RANDOM,INTERNAL"},
Expand All @@ -206,8 +206,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -231,8 +231,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -272,8 +272,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithWorkingServerAndCanc
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithUninitializedClient(
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
}
Expand Down Expand Up @@ -341,8 +341,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithFailingServerAndNonC
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down Expand Up @@ -373,8 +373,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorAfterRetrySeveralTimes
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{
MaxAttempts: 20,
Expand Down Expand Up @@ -422,8 +422,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorAfterExhaustingMaxRetryA
lis := newMockListener(t, ser)

cfg := ClientConfig{
Timeout: types.NullDurationFrom(1 * time.Second),
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
Timeout: 1 * time.Second,
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{
BackoffConfig: ClientBackoffConfig{
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedOnce(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand All @@ -491,7 +491,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedTwice(t *testing.T) {
lis := newMockListener(t, ser)

cfg := ClientConfig{
ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)},
ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)},
TLSConfig: ClientTLSConfig{Insecure: true},
RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`},
}
Expand Down
35 changes: 8 additions & 27 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"go.k6.io/k6/errext"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
insightsOutput "go.k6.io/k6/output/cloud/insights"
Expand Down Expand Up @@ -125,34 +124,14 @@ func (o *Output) Start() error {
}
o.requestMetadatasCollector = insightsOutput.NewCollector(testRunID)

insightsClientConfig := insights.ClientConfig{
IngesterHost: o.config.TracesHost.String,
Timeout: types.NewNullDuration(90*time.Second, false),
AuthConfig: insights.ClientAuthConfig{
Enabled: true,
TestRunID: testRunID,
Token: o.config.Token.String,
RequireTransportSecurity: true,
},
TLSConfig: insights.ClientTLSConfig{
Insecure: false,
},
RetryConfig: insights.ClientRetryConfig{
RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`,
MaxAttempts: 3,
PerRetryTimeout: 30 * time.Second,
BackoffConfig: insights.ClientBackoffConfig{
Enabled: true,
JitterFraction: 0.1,
WaitBetween: 1 * time.Second,
},
},
}
insightsClientConfig := insights.NewDefaultClientConfigForTestRun(
o.config.TracesHost.String,
o.config.Token.String,
testRunID,
)
insightsClient := insights.NewClient(insightsClientConfig)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := insightsClient.Dial(ctx); err != nil {
if err := insightsClient.Dial(context.Background()); err != nil {
return err
}

Expand Down Expand Up @@ -319,6 +298,8 @@ func (o *Output) flushRequestMetadatas() {
err := o.requestMetadatasFlusher.Flush()
if err != nil {
o.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud")

return
}

o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud")
Expand Down
Loading