Skip to content

Commit

Permalink
feat: Add ramping VUs scenario
Browse files Browse the repository at this point in the history
adjust code to include patch requests and replace host to not go through
loadbalancer
  • Loading branch information
Matovidlo committed Sep 3, 2024
1 parent bceeedd commit 6d4a866
Showing 1 changed file with 88 additions and 38 deletions.
126 changes: 88 additions & 38 deletions scripts/k6/stream-api/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,22 @@ const TOKEN = __ENV.API_TOKEN;
const HOST = __ENV.API_HOST || "http://localhost:8001";
const ITERATIONS = __ENV.K6_ITERATIONS || 100000;
const PARALLELISM = __ENV.K6_PARALLELISM || 1000;
const TIMEOUT = __ENV.K6_TIMEOUT || "60s";
const TIMEOUT = __ENV.K6_TIMEOUT || "10m";
// MAX_VIRTUAL_USERS is maximum number of connections.
const MAX_VIRTUAL_USERS = __ENV.K6_MAX_VIRTUAL_USERS || 100;

// PARALLEL_REQS_PER_USER is number of parallel requests per VU/connection.
const PARALLEL_REQS_PER_USER = __ENV.K6_PARALLEL_REQS_PER_USER || 10;

// RAMPING_DURATION defines the duration of initial increasing and final decreasing of the rate.
const RAMPING_DURATION = __ENV.K6_RAMPING_DURATION || "2m";

// STABLE_RATE_DURATION defines the duration of the maximum rate.
const STABLE_RATE_DURATION = __ENV.K6_STABLE_RATE_DURATION || "2m";


const SYNC_MODE = __ENV.STREAM_SYNC_MODE || "disabled"; // disabled / cache / disk
const SYNC_WAIT = __ENV.STREAM_SYNC_WAIT || "1"; // 1 = enabled, 0 = disabled

const commonHeaders = {
"Content-Type": "application/json",
Expand All @@ -18,13 +33,27 @@ const commonHeaders = {
const errors_metrics = new Counter("failed_imports");

export const options = {
teardownTimeout: '120s',
batch: PARALLEL_REQS_PER_USER,
batchPerHost: PARALLEL_REQS_PER_USER,
scenarios: {
default: {
executor: "shared-iterations",
vus: PARALLELISM,
iterations: ITERATIONS,
maxDuration: TIMEOUT,
},
vus_scenario: {
executor: 'ramping-vus',
startVUs: 5,
stages: [
{ target: 0, duration: '20s' },
{ target: MAX_VIRTUAL_USERS, duration: RAMPING_DURATION },
{ target: MAX_VIRTUAL_USERS, duration: STABLE_RATE_DURATION },
{ target: 0, duration: RAMPING_DURATION },
{ target: 0, duration: '10s' },
],
},
},
// Workaround: https://k6.io/docs/using-k6/workaround-to-calculate-iteration_duration/
thresholds: {
Expand All @@ -37,6 +66,25 @@ export const options = {
},
};

export function awaitTask(url) {
const createSourceTimeoutSec = 60
const taskUrl = stripUrlHost(url)
for (let retries = createSourceTimeoutSec; retries > 0; retries--) {
let res = get(taskUrl)
if (res.status !== 200) {
console.error(res);
throw new Error("failed to get task");
}
if (res.json().status !== "processing") {
if (res.json().error) {
throw new Error("task failed: " + res.json().error);
}
break
}
sleep(1)
}
}

export function setupSource() {
if (!TOKEN) throw new Error("Please set the `API_TOKEN` env var.");

Expand All @@ -51,35 +99,43 @@ export function setupSource() {
throw new Error("failed to create source task");
}

const createSourceTimeoutSec = 60
const taskUrl = res.json().url
for (let retries = createSourceTimeoutSec; retries > 0; retries--) {
res = get(taskUrl)
if (res.status !== 200) {
console.error(res);
throw new Error("failed to get source task");
}
if (res.json().status !== "processing") {
if (res.json().error) {
throw new Error("failed to create source: " + res.json().error);
}
break
}
sleep(1)
}
awaitTask(res.json().url)

/*res = patch(`v1/branches/default/sources/${sourceId}/settings`, {
settings: [
{
key: "storage.level.local.encoding.sync.mode",
value: SYNC_MODE,
},
{
key: "storage.level.local.encoding.sync.wait",
value: SYNC_WAIT === "1",
},
],
});
awaitTask(res.json().url)*/

res = get(`v1/branches/default/sources/${sourceId}`);
if (res.status !== 200) {
throw new Error("failed to get source");
}


const sourceUrl = res.json().http.url
if (!sourceUrl) {
throw new Error("source url is not set");
}
/*const sourceUrl = stripUrlHost(res.json().http.url)
if (!sourceUrl) {
throw new Error("source url is not set");
}*/

console.log("Source url: " + sourceUrl)
return { id: sourceId, url: sourceUrl }
// Change source URL to point on service itself
const replacedUrl = sourceUrl.replace("https://stream-in.eu-west-1.aws.keboola.dev", "http://stream-http-source.stream.svc.cluster.local")
console.log("Source url after change: " + replacedUrl)
return { id: sourceId, url: replacedUrl }
}

export function setupSink(sourceId, body) {
Expand All @@ -91,22 +147,7 @@ export function setupSink(sourceId, body) {
throw new Error("failed to create sink task");
}

const createSinkTimeoutSec = 60
const taskUrl = res.json().url
for (let retries = createSinkTimeoutSec; retries > 0; retries--) {
res = get(taskUrl)
if (res.status !== 200) {
console.error(res);
throw new Error("failed to get sink task");
}
if (res.json().status !== "processing") {
if (res.json().error) {
throw new Error("failed to create sink: " + res.json().error);
}
break
}
sleep(1)
}
awaitTask(res.json().url)

res = get(`v1/branches/default/sources/${sourceId}/sinks/${body.sinkId}`);
if (res.status !== 200) {
Expand All @@ -122,12 +163,15 @@ export function setupSink(sourceId, body) {
return { id: sinkId }
}

export function teardownSource(sourceId) {
console.info("waiting 30s before source deletion")
sleep(30)
export function stripUrlHost(url) {
return (new URL(url)).pathname
}

export function teardownSource(sourceId) {
console.info("waiting 50s before source deletion")
sleep(50)
const res = del(`v1/branches/default/sources/${sourceId}`);
if (res.status !== 202) {
if (res.status !== 200) {
console.error(res);
throw new Error("failed to delete source");
}
Expand All @@ -145,6 +189,12 @@ export function post(url, data, headers = {}) {
});
}

export function patch(url, data, headers = {}) {
return http.patch(normalizeUrl(url), JSON.stringify(data), {
headers: Object.assign({}, commonHeaders, headers),
});
}

export function del(url, headers = {}) {
return http.del(normalizeUrl(url), null, {
headers: Object.assign({}, commonHeaders, headers),
Expand Down

0 comments on commit 6d4a866

Please sign in to comment.