From 6d4a8665db1cb436e80b11eaaf1667d7104b7069 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Va=C5=A1ko?= Date: Tue, 3 Sep 2024 08:05:38 +0200 Subject: [PATCH] feat: Add ramping VUs scenario adjust code to include patch requests and replace host to not go through loadbalancer --- scripts/k6/stream-api/common.js | 126 ++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 38 deletions(-) diff --git a/scripts/k6/stream-api/common.js b/scripts/k6/stream-api/common.js index 168aeb01d4..0e87048603 100644 --- a/scripts/k6/stream-api/common.js +++ b/scripts/k6/stream-api/common.js @@ -8,7 +8,22 @@ const TOKEN = __ENV.API_TOKEN; const HOST = __ENV.API_HOST || "http://localhost:8001"; const ITERATIONS = __ENV.K6_ITERATIONS || 100000; const PARALLELISM = __ENV.K6_PARALLELISM || 1000; -const TIMEOUT = __ENV.K6_TIMEOUT || "60s"; +const TIMEOUT = __ENV.K6_TIMEOUT || "10m"; +// MAX_VIRTUAL_USERS is maximum number of connections. +const MAX_VIRTUAL_USERS = __ENV.K6_MAX_VIRTUAL_USERS || 100; + +// PARALLEL_REQS_PER_USER is number of parallel requests per VU/connection. +const PARALLEL_REQS_PER_USER = __ENV.K6_PARALLEL_REQS_PER_USER || 10; + +// RAMPING_DURATION defines the duration of initial increasing and final decreasing of the rate. +const RAMPING_DURATION = __ENV.K6_RAMPING_DURATION || "2m"; + +// STABLE_RATE_DURATION defines the duration of the maximum rate. +const STABLE_RATE_DURATION = __ENV.K6_STABLE_RATE_DURATION || "2m"; + + +const SYNC_MODE = __ENV.STREAM_SYNC_MODE || "disabled"; // disabled / cache / disk +const SYNC_WAIT = __ENV.STREAM_SYNC_WAIT || "1"; // 1 = enabled, 0 = disabled const commonHeaders = { "Content-Type": "application/json", @@ -18,6 +33,9 @@ const commonHeaders = { const errors_metrics = new Counter("failed_imports"); export const options = { + teardownTimeout: '120s', + batch: PARALLEL_REQS_PER_USER, + batchPerHost: PARALLEL_REQS_PER_USER, scenarios: { default: { executor: "shared-iterations", @@ -25,6 +43,17 @@ export const options = { iterations: ITERATIONS, maxDuration: TIMEOUT, }, + vus_scenario: { + executor: 'ramping-vus', + startVUs: 5, + stages: [ + { target: 0, duration: '20s' }, + { target: MAX_VIRTUAL_USERS, duration: RAMPING_DURATION }, + { target: MAX_VIRTUAL_USERS, duration: STABLE_RATE_DURATION }, + { target: 0, duration: RAMPING_DURATION }, + { target: 0, duration: '10s' }, + ], + }, }, // Workaround: https://k6.io/docs/using-k6/workaround-to-calculate-iteration_duration/ thresholds: { @@ -37,6 +66,25 @@ export const options = { }, }; +export function awaitTask(url) { + const createSourceTimeoutSec = 60 + const taskUrl = stripUrlHost(url) + for (let retries = createSourceTimeoutSec; retries > 0; retries--) { + let res = get(taskUrl) + if (res.status !== 200) { + console.error(res); + throw new Error("failed to get task"); + } + if (res.json().status !== "processing") { + if (res.json().error) { + throw new Error("task failed: " + res.json().error); + } + break + } + sleep(1) + } +} + export function setupSource() { if (!TOKEN) throw new Error("Please set the `API_TOKEN` env var."); @@ -51,35 +99,43 @@ export function setupSource() { throw new Error("failed to create source task"); } - const createSourceTimeoutSec = 60 - const taskUrl = res.json().url - for (let retries = createSourceTimeoutSec; retries > 0; retries--) { - res = get(taskUrl) - if (res.status !== 200) { - console.error(res); - throw new Error("failed to get source task"); - } - if (res.json().status !== "processing") { - if (res.json().error) { - throw new Error("failed to create source: " + res.json().error); - } - break - } - sleep(1) - } + awaitTask(res.json().url) + + /*res = patch(`v1/branches/default/sources/${sourceId}/settings`, { + settings: [ + { + key: "storage.level.local.encoding.sync.mode", + value: SYNC_MODE, + }, + { + key: "storage.level.local.encoding.sync.wait", + value: SYNC_WAIT === "1", + }, + ], + }); + + awaitTask(res.json().url)*/ res = get(`v1/branches/default/sources/${sourceId}`); if (res.status !== 200) { throw new Error("failed to get source"); } + const sourceUrl = res.json().http.url if (!sourceUrl) { throw new Error("source url is not set"); } + /*const sourceUrl = stripUrlHost(res.json().http.url) + if (!sourceUrl) { + throw new Error("source url is not set"); + }*/ console.log("Source url: " + sourceUrl) - return { id: sourceId, url: sourceUrl } + // Change source URL to point on service itself + const replacedUrl = sourceUrl.replace("https://stream-in.eu-west-1.aws.keboola.dev", "http://stream-http-source.stream.svc.cluster.local") + console.log("Source url after change: " + replacedUrl) + return { id: sourceId, url: replacedUrl } } export function setupSink(sourceId, body) { @@ -91,22 +147,7 @@ export function setupSink(sourceId, body) { throw new Error("failed to create sink task"); } - const createSinkTimeoutSec = 60 - const taskUrl = res.json().url - for (let retries = createSinkTimeoutSec; retries > 0; retries--) { - res = get(taskUrl) - if (res.status !== 200) { - console.error(res); - throw new Error("failed to get sink task"); - } - if (res.json().status !== "processing") { - if (res.json().error) { - throw new Error("failed to create sink: " + res.json().error); - } - break - } - sleep(1) - } + awaitTask(res.json().url) res = get(`v1/branches/default/sources/${sourceId}/sinks/${body.sinkId}`); if (res.status !== 200) { @@ -122,12 +163,15 @@ export function setupSink(sourceId, body) { return { id: sinkId } } -export function teardownSource(sourceId) { - console.info("waiting 30s before source deletion") - sleep(30) +export function stripUrlHost(url) { + return (new URL(url)).pathname +} +export function teardownSource(sourceId) { + console.info("waiting 50s before source deletion") + sleep(50) const res = del(`v1/branches/default/sources/${sourceId}`); - if (res.status !== 202) { + if (res.status !== 200) { console.error(res); throw new Error("failed to delete source"); } @@ -145,6 +189,12 @@ export function post(url, data, headers = {}) { }); } +export function patch(url, data, headers = {}) { + return http.patch(normalizeUrl(url), JSON.stringify(data), { + headers: Object.assign({}, commonHeaders, headers), + }); +} + export function del(url, headers = {}) { return http.del(normalizeUrl(url), null, { headers: Object.assign({}, commonHeaders, headers),