Skip to content

Commit

Permalink
Merge pull request #2175 from keboola/feat-add-toxiproxy-stream-test
Browse files Browse the repository at this point in the history
feat: Add toxiproxy stream test
  • Loading branch information
Matovidlo authored Jan 6, 2025
2 parents b8421f8 + 810bf12 commit 8d1500e
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (c *ClientConnection) Close(ctx context.Context) error {

func (c *ClientConnection) dialLoop(ctx context.Context, initDone chan error) {
b := newClientConnBackoff()
b.InitialInterval = 100 * time.Millisecond
var closeErr error
for {
if c.isClosed() || c.client.isClosed() || errors.Is(closeErr, yamux.ErrSessionShutdown) || errors.Is(closeErr, io.EOF) {
Expand Down
3 changes: 3 additions & 0 deletions provisioning/dev/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ ENV PATH="$PATH:$GOBIN"
# Install packages
RUN apt-get update && apt-get install --no-install-recommends --yes nano protobuf-compiler graphviz build-essential
ENV EDITOR=nano
# Download toxiproxy
RUN curl -L https://github.com/Shopify/toxiproxy/releases/download/v2.11.0/toxiproxy-server-linux-amd64 -o /usr/local/bin/toxiproxy-server
RUN chmod +x /usr/local/bin/toxiproxy-server

# Install tools
RUN mkdir -p /tmp/build
Expand Down
3 changes: 1 addition & 2 deletions test/stream/bridge/keboola/guest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ func TestGuestUserWorkflow(t *testing.T) {
}
ts := setup(
t,
ctx,
modifyConfig,
utilsproject.WithIsGuest(),
)
ts.startNodes(t, ctx, modifyConfig)
ts.setupSourceThroughAPI(t, ctx, http.StatusForbidden)
defer ts.teardown(t, ctx)
recreateStreamAPI(t, &ts, ctx, modifyConfig)
Expand Down
297 changes: 283 additions & 14 deletions test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/Shopify/toxiproxy/v2"
toxiproxyClient "github.com/Shopify/toxiproxy/v2/client"
"github.com/c2h5oh/datasize"
"github.com/keboola/go-client/pkg/keboola"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -25,9 +29,7 @@ import (
)

// To see details run: TEST_VERBOSE=true go test ./test/stream/bridge/... -v.
func TestKeboolaBridgeWorkflow(t *testing.T) {
t.Parallel()

func TestKeboolaBridgeWorkflow(t *testing.T) { // nolint: paralleltest
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

Expand Down Expand Up @@ -71,7 +73,8 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
cfg.Storage.MetadataCleanup.Interval = 10 * time.Second
}

ts := setup(t, ctx, configFn)
ts := setup(t)
ts.startNodes(t, ctx, configFn)
ts.setupSink(t, ctx)
defer ts.teardown(t, ctx)

Expand Down Expand Up @@ -302,6 +305,242 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
ts.checkKeboolaTable(t, ctx, 1, 129)
}

func TestNetworkIssuesKeboolaBridgeWorkflow(t *testing.T) { // nolint: paralleltest
metrics := toxiproxy.NewMetricsContainer(nil)
server := toxiproxy.NewServer(metrics, zerolog.New(os.Stderr))
go server.Listen("localhost:8474")

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

// Update configuration to make the cluster testable
configFn := func(cfg *config.Config) {
// Enable metadata cleanup for removing storage jobs
cfg.Storage.MetadataCleanup.Enabled = true
// Disable unrelated workers
cfg.Storage.DiskCleanup.Enabled = false
cfg.API.Task.CleanupEnabled = false

// Use deterministic load balancer
cfg.Storage.Level.Local.Writer.Network.PipelineBalancer = network.RoundRobinBalancerType

// In the test, we trigger the slice upload via the records count, the other values are intentionally high.
cfg.Storage.Level.Staging.Upload = stagingConfig.UploadConfig{
MinInterval: duration.From(1 * time.Second), // minimum
Trigger: stagingConfig.UploadTrigger{
Count: 10,
Size: 1000 * datasize.MB,
Interval: duration.From(30 * time.Minute),
},
}

// In the test, we trigger the file import only when sink limit is not reached.
cfg.Sink.Table.Keboola.JobLimit = 1

// In the test, we trigger the file import via the records count, the other values are intentionally high.
cfg.Storage.Level.Target.Import = targetConfig.ImportConfig{
MinInterval: duration.From(30 * time.Second), // minimum
Trigger: targetConfig.ImportTrigger{
Count: 30,
Size: 1000 * datasize.MB,
Interval: duration.From(30 * time.Minute),
SlicesCount: 100,
Expiration: duration.From(30 * time.Minute),
},
}

// Cleanup should be perfomed more frequently to remove already finished storage jobs
cfg.Storage.MetadataCleanup.Interval = 10 * time.Second
}

ts := setup(t)
client := toxiproxyClient.NewClient("localhost:8474")
proxy, err := client.CreateProxy("source1", ts.sourceURL1[7:len(ts.sourceURL1)-1], ts.sourceURL1[7:])
require.NoError(t, err)
proxy.AddToxic("latency_down", "latency", "downstream", 1.0, map[string]interface{}{
"latency": 1000,
})
t.Cleanup(func() {
proxy.Delete()
server.Shutdown()
})

ts.proxy = proxy
ts.startNodes(t, ctx, configFn)
defer ts.teardown(t, ctx)

ts.sourceURL1 = ts.sourceURL1[:len(ts.sourceURL1)-1]
ts.sourcePort1 /= 10
ts.logger.Infof(ctx, "proxyurl:%s, port:%d", ts.sourceURL1, ts.sourcePort1)
ts.setupSink(t, ctx)
// Check initial state
ts.checkState(t, ctx, []file{
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
})

// First upload
ts.logSection(t, "testing first upload")
ts.sendRecords(t, ctx, 1, 20)
if ts.proxy != nil {
ts.proxy.Disable()
time.Sleep(100 * time.Millisecond)
ts.proxy.Enable()
}

// First import
ts.logSection(t, "testing first import")
ts.testFileImport(t, ctx, fileImport{
sendRecords: records{
startID: 21,
count: 10,
},
expectedFileRecords: records{
startID: 1,
count: 30,
},
expectedTableRecords: records{
startID: 1,
count: 30,
},
expectedFiles: []file{
{
state: model.FileImported, // <<<<<
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
},
})

ts.logSection(t, "testing second upload")
ts.sendRecords(t, ctx, 31, 20)
if ts.proxy != nil {
ts.proxy.Disable()
time.Sleep(100 * time.Millisecond)
ts.proxy.Enable()
}

ts.logSection(t, "testing second import")
ts.testFileImport(t, ctx, fileImport{
sendRecords: records{
startID: 51,
count: 10,
},
expectedFileRecords: records{
startID: 31,
count: 30,
},
expectedTableRecords: records{
startID: 1,
count: 60,
},
expectedFiles: []file{
{
state: model.FileImported,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileImported, // <<<<<
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
},
})

// Test simultaneous slice and file rotations
ts.logSection(t, "testing simultaneous file and slice rotations, both conditions are met")
ts.logger.Truncate()
ts.sendRecords(t, ctx, 61, 69)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"}
{"level":"info","message":"importing file","component":"storage.node.operator.file.import"}
{"level":"info","message":"imported file","component":"storage.node.operator.file.import"}
`)
}, 60*time.Second, 100*time.Millisecond)
ts.checkKeboolaTable(t, ctx, 1, 129)
}

func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expectations sliceUpload) {
t.Helper()
ts.logger.Truncate()
Expand All @@ -326,7 +565,7 @@ func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expecta
{"level":"info","message":"closed slice","component":"storage.node.operator.slice.rotation"}
{"level":"info","message":"closed slice","component":"storage.node.operator.slice.rotation"}
`)
}, 10*time.Second, 10*time.Millisecond)
}, 20*time.Second, 10*time.Millisecond)

// Expect slices upload
ts.logSection(t, "expecting slices upload")
Expand All @@ -339,7 +578,7 @@ func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expecta
{"level":"info","message":"uploaded slice","component":"storage.node.operator.slice.upload"}
{"level":"info","message":"uploaded slice","component":"storage.node.operator.slice.upload"}
`)
}, 15*time.Second, 10*time.Millisecond)
}, 20*time.Second, 10*time.Millisecond)

// Check file/slices state after the upload
files := ts.checkState(t, ctx, expectations.expectedFiles)
Expand Down Expand Up @@ -409,6 +648,16 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
{"level":"info","message":"rotating file, import conditions met: count threshold met, records count: 30, threshold: 30","component":"storage.node.operator.file.rotation"}
{"level":"info","message":"rotated file","component":"storage.node.operator.file.rotation"}
`)

var files []model.File
files, err := ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(expectations.sendRecords.count) {
return
}
}
}, 60*time.Second, 100*time.Millisecond)

// Expect slices closing, upload and file closing
Expand All @@ -432,7 +681,7 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"}
`)
}, 15*time.Second, 100*time.Millisecond)
}, 25*time.Second, 100*time.Millisecond)

// Expect file import
ts.logSection(t, "expecting file import")
Expand Down Expand Up @@ -487,13 +736,33 @@ func (ts *testState) sendRecords(t *testing.T, ctx context.Context, start, n int
sourceURL = ts.sourceURL2
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i)))
require.NoError(t, err)
resp, err := ts.httpClient.Do(req)
if assert.NoError(t, err) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.NoError(t, resp.Body.Close())
}
go func() {
var err error
for {
var files []model.File
files, err = ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(start+n-1) {
return
}
}

var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i)))
require.NoError(t, err)
var resp *http.Response
resp, err = ts.httpClient.Do(req)
if err == nil {
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.NoError(t, resp.Body.Close())
return
}

time.Sleep(10 * time.Millisecond)
}
}()
}
}

Expand Down
Loading

0 comments on commit 8d1500e

Please sign in to comment.