Skip to content

Commit

Permalink
add changes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamie Chapman-Brown committed Jan 11, 2024
1 parent 592a5ea commit d706e3d
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ For Concise bitmaps:
## Operations

This section describes how some supervisor APIs work in the Rabbit Stream Indexing Service.
For all supervisor APIs, check [Supervisor APIs](../../operations/api-reference.md#supervisors).
For all supervisor APIs, check [Supervisor APIs](../../api-reference/api-reference.md#supervisors).

### RabbitMQ Authentication

Expand Down
6 changes: 6 additions & 0 deletions extensions-contrib/rabbit-stream-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
<artifactId>stream-client</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected Long getNextStartOffset(@NotNull Long sequenceNumber)
@Override
protected List<OrderedPartitionableRecord<String, Long, ByteEntity>> getRecords(
RecordSupplier<String, Long, ByteEntity> recordSupplier,
TaskToolbox toolbox) throws Exception
TaskToolbox toolbox)
{
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,8 @@ public class RabbitStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTu

static final int ASSUMED_RECORD_SIZE = 10_000;

/**
* Together with {@link RabbitStreamIndexTaskIOConfig#RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION}, don't take up more
* than 15% of the heap per task.
*/
private static final double RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION = 0.1;

/**
* Together with {@link RabbitStreamIndexTaskIOConfig#MAX_RECORD_FETCH_MEMORY}, don't take up more than 200MB per task.
*/
private static final int MAX_RECORD_BUFFER_MEMORY = 100_000_000;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public class RabbitStreamRecordSupplier implements RecordSupplier<String, Long,
private final int maxRecordsPerPoll;
private final int recordBufferSize;

public final Map<String, OffsetSpecification> offsetMap;
private final Map<String, OffsetSpecification> offsetMap;

List<Consumer> consumers;
private List<Consumer> consumers;
private boolean isRunning;
private Semaphore stateSemaphore;

Expand Down Expand Up @@ -140,7 +140,7 @@ public RabbitStreamRecordSupplier(
}
}

public void startBackgroundFetch()
private void startBackgroundFetch()
{
try {
// aquire uninteruptibly to prevent state corruption issues
Expand Down Expand Up @@ -244,12 +244,12 @@ private void removeOldAssignments(Set<StreamPartition<String>> streamPartitionst
}
}

Iterator<Map.Entry<String, OffsetSpecification>> offsetItterator = offsetMap.entrySet().iterator();
while (offsetItterator.hasNext()) {
Map.Entry<String, OffsetSpecification> entry = offsetItterator.next();
Iterator<Map.Entry<String, OffsetSpecification>> offsetIterator = offsetMap.entrySet().iterator();
while (offsetIterator.hasNext()) {
Map.Entry<String, OffsetSpecification> entry = offsetIterator.next();
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
if (!streamPartitionstoKeep.contains(comparitor)) {
offsetItterator.remove();
offsetIterator.remove();
}
}
}
Expand Down Expand Up @@ -287,14 +287,14 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
}

@Override
public void seek(StreamPartition<String> partition, Long sequenceNumber) throws InterruptedException
public void seek(StreamPartition<String> partition, Long sequenceNumber)
{
filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
offsetMap.put(partition.getPartitionId(), OffsetSpecification.offset(sequenceNumber));
}

@Override
public void seekToEarliest(Set<StreamPartition<String>> partitions) throws InterruptedException
public void seekToEarliest(Set<StreamPartition<String>> partitions)
{
filterBufferAndResetBackgroundFetch(partitions);
for (StreamPartition<String> part : partitions) {
Expand All @@ -303,7 +303,7 @@ public void seekToEarliest(Set<StreamPartition<String>> partitions) throws Inter
}

@Override
public void seekToLatest(Set<StreamPartition<String>> partitions) throws InterruptedException
public void seekToLatest(Set<StreamPartition<String>> partitions)
{
filterBufferAndResetBackgroundFetch(partitions);
for (StreamPartition<String> part : partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RabbitStreamSupervisor(
@Override
protected RecordSupplier<String, Long, ByteEntity> setupRecordSupplier()
{
RabbitStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
RabbitStreamIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();

return new RabbitStreamRecordSupplier(
Expand Down Expand Up @@ -218,6 +217,7 @@ protected List<SeekableStreamIndexTask<String, Long, ByteEntity>> createIndexTas
{
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);

List<SeekableStreamIndexTask<String, Long, ByteEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ public RabbitStreamSupervisorIOConfigTest()
mapper.registerModules((Iterable<Module>) new RabbitStreamIndexTaskModule().getJacksonModules());
}

@Rule
public final ExpectedException exception = ExpectedException.none();

@Test
public void testSerdeWithDefaults() throws Exception
{
Expand Down

0 comments on commit d706e3d

Please sign in to comment.