Skip to content

Commit

Permalink
MINOR: Add RandomComponentPayloadGenerator and update Trogdor documen…
Browse files Browse the repository at this point in the history
…tation (apache#7103)

Add a new RandomComponentPayloadGenerator that gives a payload based on random selection of another PayloadGenerator.  Additionally, add an example that uses a non-default PayloadGenerator configuration to TROGDOR.md.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
jolshan authored and cmccabe committed Jul 31, 2019
1 parent 6fbac3c commit 2c2b30d
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 3 deletions.
30 changes: 30 additions & 0 deletions TROGDOR.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,36 @@ The task specification is usually written as JSON. For example, this task speci
"durationMs": 30000,
"partitions": [["node1", "node2"], ["node3"]]
}

This task runs a simple ProduceBench test on a cluster with one producer node, 5 topics, and 10,000 messages per second.
The keys are generated sequentially and the configured partitioner (DefaultPartitioner) is used.

{
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"durationMs": 10000000,
"producerNode": "node0",
"bootstrapServers": "localhost:9092",
"targetMessagesPerSec": 10000,
"maxMessages": 50000,
"activeTopics": {
"foo[1-3]": {
"numPartitions": 10,
"replicationFactor": 1
}
},
"inactiveTopics": {
"foo[4-5]": {
"numPartitions": 10,
"replicationFactor": 1
}
},
"keyGenerator": {
"type": "sequential",
"size": 8,
"offset": 1
},
"useConfiguredPartitioner": true
}

Tasks are submitted to the coordinator. Once the coordinator determines that it is time for the task to start, it creates workers on agent processes. The workers run until the task is done.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
@JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"),
@JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"),
@JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom"),
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null")
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
@JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = "randomComponent")
})
public interface PayloadGenerator {
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Contains a percent value represented as an integer between 1 and 100 and a PayloadGenerator to specify
* how often that PayloadGenerator should be used.
*/
public class RandomComponent {
private final int percent;
private final PayloadGenerator component;


@JsonCreator
public RandomComponent(@JsonProperty("percent") int percent,
@JsonProperty("component") PayloadGenerator component) {
this.percent = percent;
this.component = component;
}

@JsonProperty
public int percent() {
return percent;
}

@JsonProperty
public PayloadGenerator component() {
return component;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;


/**
* A PayloadGenerator which generates pseudo-random payloads based on other PayloadGenerators.
*
* Given a seed and non-null list of RandomComponents, RandomComponentPayloadGenerator
* will use any given generator in its list of components a percentage of the time based on the
* percent field in the RandomComponent. These percent fields must be integers greater than 0
* and together add up to 100. The payloads generated can be reproduced from run to run.
*
* An example of how to include this generator in a Trogdor taskSpec is shown below.
* #{@code
* "keyGenerator": {
* "type": "randomComponent",
* "seed": 456,
* "components": [
* {
* "percent": 50,
* "component": {
* "type": "null"
* }
* },
* {
* "percent": 50,
* "component": {
* "type": "uniformRandom",
* "size": 4,
* "seed": 123,
* "padding": 0
* }
* }
* ]
* }
* }
*/
public class RandomComponentPayloadGenerator implements PayloadGenerator {
private final long seed;
private final List<RandomComponent> components;
private final Random random = new Random();

@JsonCreator
public RandomComponentPayloadGenerator(@JsonProperty("seed") long seed,
@JsonProperty("components") List<RandomComponent> components) {
this.seed = seed;
if (components == null || components.isEmpty()) {
throw new IllegalArgumentException("Components must be a specified, non-empty list of RandomComponents.");
}
int sum = 0;
for (RandomComponent component : components) {
if (component.percent() < 1) {
throw new IllegalArgumentException("Percent value must be greater than zero.");
}
sum += component.percent();
}
if (sum != 100) {
throw new IllegalArgumentException("Components must be a list of RandomComponents such that the percent fields sum to 100");
}
this.components = new ArrayList<>(components);
}

@JsonProperty
public long seed() {
return seed;
}

@JsonProperty
public List<RandomComponent> components() {
return components;
}

@Override
public byte[] generate(long position) {
int randPercent;
synchronized (random) {
random.setSeed(seed + position);
randPercent = random.nextInt(100);
}
int curPercent = 0;
RandomComponent com = components.get(0);
for (RandomComponent component : components) {
curPercent += component.percent();
if (curPercent > randPercent) {
com = component;
break;
}
}
return com.component().generate(position);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;

public class PayloadGeneratorTest {
@Rule
Expand Down Expand Up @@ -104,7 +107,11 @@ private static void testReproducible(PayloadGenerator generator) {
byte[] val = generator.generate(123);
generator.generate(456);
byte[] val2 = generator.generate(123);
assertArrayEquals(val, val2);
if (val == null) {
assertNull(val2);
} else {
assertArrayEquals(val, val2);
}
}

@Test
Expand All @@ -123,6 +130,92 @@ public void testUniformRandomPayloadGeneratorPaddingBytes() {
assertArrayEquals(val1End, val2End);
assertArrayEquals(val1End, val3End);
}

@Test
public void testRandomComponentPayloadGenerator() {
NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
RandomComponent nullConfig = new RandomComponent(50, nullGenerator);

UniformRandomPayloadGenerator uniformGenerator =
new UniformRandomPayloadGenerator(5, 123, 0);
RandomComponent uniformConfig = new RandomComponent(50, uniformGenerator);

SequentialPayloadGenerator sequentialGenerator =
new SequentialPayloadGenerator(4, 10);
RandomComponent sequentialConfig = new RandomComponent(75, sequentialGenerator);

ConstantPayloadGenerator constantGenerator =
new ConstantPayloadGenerator(4, new byte[0]);
RandomComponent constantConfig = new RandomComponent(25, constantGenerator);

List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(sequentialConfig, constantConfig));
byte[] expected = new byte[4];

PayloadIterator iter = new PayloadIterator(
new RandomComponentPayloadGenerator(4, components1));
int notNull = 0;
int isNull = 0;
while (notNull < 1000 || isNull < 1000) {
byte[] cur = iter.next();
if (cur == null) {
isNull++;
} else {
notNull++;
}
}

iter = new PayloadIterator(
new RandomComponentPayloadGenerator(123, components2));
int isZeroBytes = 0;
int isNotZeroBytes = 0;
while (isZeroBytes < 500 || isNotZeroBytes < 1500) {
byte[] cur = iter.next();
if (Arrays.equals(expected, cur)) {
isZeroBytes++;
} else {
isNotZeroBytes++;
}
}

RandomComponent uniformConfig2 = new RandomComponent(25, uniformGenerator);
RandomComponent sequentialConfig2 = new RandomComponent(25, sequentialGenerator);
RandomComponent nullConfig2 = new RandomComponent(25, nullGenerator);

List<RandomComponent> components3 = new ArrayList<>(Arrays.asList(sequentialConfig2, uniformConfig2, nullConfig));
List<RandomComponent> components4 = new ArrayList<>(Arrays.asList(uniformConfig2, sequentialConfig2, constantConfig, nullConfig2));

testReproducible(new RandomComponentPayloadGenerator(4, components1));
testReproducible(new RandomComponentPayloadGenerator(123, components2));
testReproducible(new RandomComponentPayloadGenerator(50, components3));
testReproducible(new RandomComponentPayloadGenerator(0, components4));
}

@Test
public void testRandomComponentPayloadGeneratorErrors() {
NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
RandomComponent nullConfig = new RandomComponent(25, nullGenerator);
UniformRandomPayloadGenerator uniformGenerator =
new UniformRandomPayloadGenerator(5, 123, 0);
RandomComponent uniformConfig = new RandomComponent(25, uniformGenerator);
ConstantPayloadGenerator constantGenerator =
new ConstantPayloadGenerator(4, new byte[0]);
RandomComponent constantConfig = new RandomComponent(-25, constantGenerator);

List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(
nullConfig, constantConfig, uniformConfig, nullConfig, uniformConfig, uniformConfig));

assertThrows(IllegalArgumentException.class, () -> {
new PayloadIterator(new RandomComponentPayloadGenerator(1, new ArrayList<>()));
});
assertThrows(IllegalArgumentException.class, () -> {
new PayloadIterator(new RandomComponentPayloadGenerator(13, components2));
});
assertThrows(IllegalArgumentException.class, () -> {
new PayloadIterator(new RandomComponentPayloadGenerator(123, components1));
});
}

@Test
public void testPayloadIterator() {
Expand Down

0 comments on commit 2c2b30d

Please sign in to comment.