Skip to content

Commit

Permalink
add changes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamie Chapman-Brown committed Feb 13, 2024
1 parent 95fa3cc commit 95ac4e5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ For Concise bitmaps:
## Operations

This section describes how some supervisor APIs work in the Rabbit Stream Indexing Service.
For all supervisor APIs, check [Supervisor APIs](../../operations/api-reference.md#supervisors).
For all supervisor APIs, check [Supervisor APIs](../../api-reference/api-reference.md#supervisors).

### RabbitMQ Authentication

Expand Down
8 changes: 7 additions & 1 deletion extensions-contrib/rabbit-stream-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>29.0.0-SNAPSHOT</version>
<version>30.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -118,6 +118,12 @@
<artifactId>stream-client</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected Long getNextStartOffset(@NotNull Long sequenceNumber)
@Override
protected List<OrderedPartitionableRecord<String, Long, ByteEntity>> getRecords(
RecordSupplier<String, Long, ByteEntity> recordSupplier,
TaskToolbox toolbox) throws Exception
TaskToolbox toolbox)
{
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,8 @@ public class RabbitStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTu

static final int ASSUMED_RECORD_SIZE = 10_000;

/**
* Together with {@link RabbitStreamIndexTaskIOConfig#RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION}, don't take up more
* than 15% of the heap per task.
*/
private static final double RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION = 0.1;

/**
* Together with {@link RabbitStreamIndexTaskIOConfig#MAX_RECORD_FETCH_MEMORY}, don't take up more than 200MB per task.
*/
private static final int MAX_RECORD_BUFFER_MEMORY = 100_000_000;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public class RabbitStreamRecordSupplier implements RecordSupplier<String, Long,
private final int maxRecordsPerPoll;
private final int recordBufferSize;

public final Map<String, OffsetSpecification> offsetMap;
private final Map<String, OffsetSpecification> offsetMap;

List<Consumer> consumers;
private List<Consumer> consumers;
private boolean isRunning;
private Semaphore stateSemaphore;

Expand Down Expand Up @@ -140,7 +140,7 @@ public RabbitStreamRecordSupplier(
}
}

public void startBackgroundFetch()
private void startBackgroundFetch()
{
try {
// aquire uninteruptibly to prevent state corruption issues
Expand Down Expand Up @@ -244,12 +244,12 @@ private void removeOldAssignments(Set<StreamPartition<String>> streamPartitionst
}
}

Iterator<Map.Entry<String, OffsetSpecification>> offsetItterator = offsetMap.entrySet().iterator();
while (offsetItterator.hasNext()) {
Map.Entry<String, OffsetSpecification> entry = offsetItterator.next();
Iterator<Map.Entry<String, OffsetSpecification>> offsetIterator = offsetMap.entrySet().iterator();
while (offsetIterator.hasNext()) {
Map.Entry<String, OffsetSpecification> entry = offsetIterator.next();
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
if (!streamPartitionstoKeep.contains(comparitor)) {
offsetItterator.remove();
offsetIterator.remove();
}
}
}
Expand Down Expand Up @@ -287,14 +287,14 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
}

@Override
public void seek(StreamPartition<String> partition, Long sequenceNumber) throws InterruptedException
public void seek(StreamPartition<String> partition, Long sequenceNumber)
{
filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
offsetMap.put(partition.getPartitionId(), OffsetSpecification.offset(sequenceNumber));
}

@Override
public void seekToEarliest(Set<StreamPartition<String>> partitions) throws InterruptedException
public void seekToEarliest(Set<StreamPartition<String>> partitions)
{
filterBufferAndResetBackgroundFetch(partitions);
for (StreamPartition<String> part : partitions) {
Expand All @@ -303,7 +303,7 @@ public void seekToEarliest(Set<StreamPartition<String>> partitions) throws Inter
}

@Override
public void seekToLatest(Set<StreamPartition<String>> partitions) throws InterruptedException
public void seekToLatest(Set<StreamPartition<String>> partitions)
{
filterBufferAndResetBackgroundFetch(partitions);
for (StreamPartition<String> part : partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RabbitStreamSupervisor(
@Override
protected RecordSupplier<String, Long, ByteEntity> setupRecordSupplier()
{
RabbitStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
RabbitStreamIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();

return new RabbitStreamRecordSupplier(
Expand Down Expand Up @@ -218,6 +217,7 @@ protected List<SeekableStreamIndexTask<String, Long, ByteEntity>> createIndexTas
{
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);

List<SeekableStreamIndexTask<String, Long, ByteEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,69 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertFalse(config.isResetOffsetAutomatically());
}


@Test
public void testtoString() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxRowsInMemory\": 100,\n"
+ " \"maxRowsPerSegment\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";

RabbitStreamIndexTaskTuningConfig config = (RabbitStreamIndexTaskTuningConfig) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
TuningConfig.class)),
TuningConfig.class);

String resStr = "RabbitStreamSupervisorTuningConfig{" +
"maxRowsInMemory=100, " +
"maxRowsPerSegment=100, " +
"maxTotalRows=null, " +
"maxBytesInMemory=262144000, " +
"skipBytesInMemoryOverheadCheck=false, " +
"intermediatePersistPeriod=PT1H, " +
"maxPendingPersists=100, " +
"indexSpec=IndexSpec{" +
"bitmapSerdeFactory=RoaringBitmapSerdeFactory{}, " +
"dimensionCompression=lz4, " +
"stringDictionaryEncoding=Utf8{}, " +
"metricCompression=lz4, " +
"longEncoding=longs, " +
"jsonCompression=null, " +
"segmentLoader=null" +
"}, " +
"reportParseExceptions=true, " +
"handoffConditionTimeout=100, " +
"resetOffsetAutomatically=false, " +
"segmentWriteOutMediumFactory=null, " +
"workerThreads=null, " +
"chatThreads=null, " +
"chatRetries=8, " +
"httpTimeout=PT10S, " +
"shutdownTimeout=PT80S, " +
"recordBufferSize=1000, " +
"recordBufferOfferTimeout=500, " +
"offsetFetchPeriod=PT30S, " +
"intermediateHandoffPeriod=" + config.getIntermediateHandoffPeriod() + ", " +
"logParseExceptions=false, " +
"maxParseExceptions=0, " +
"maxSavedParseExceptions=0, " +
"maxRecordsPerPoll=null}";


Assert.assertEquals(resStr, config.toString());
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@
<module>extensions-contrib/druid-iceberg-extensions</module>
<module>extensions-contrib/druid-deltalake-extensions</module>
<module>extensions-contrib/spectator-histogram</module>

<module>extensions-contrib/rabbit-stream-indexing-service</module>

<!-- distribution packaging -->
<module>distribution</module>
<!-- Revised integration tests -->
Expand Down

0 comments on commit 95ac4e5

Please sign in to comment.