Skip to content

Commit

Permalink
refactor mget method improved readability and efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
ori0o0p committed Nov 25, 2024
1 parent d616b16 commit bcb2885
Showing 1 changed file with 21 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,37 +264,29 @@ public Flux<KeyValue<K, V>> mget(Iterable<K> keys) {
return super.mget(keyList);
}

List<Publisher<KeyValue<K, V>>> publishers = new ArrayList<>();

for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
publishers.add(super.mget(entry.getValue()));
}

Flux<KeyValue<K, V>> fluxes = Flux.mergeSequential(publishers);

Mono<List<KeyValue<K, V>>> map = fluxes.collectList().map(vs -> {

KeyValue<K, V>[] values = new KeyValue[vs.size()];
int offset = 0;
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {

for (int i = 0; i < keyList.size(); i++) {

int index = entry.getValue().indexOf(keyList.get(i));
if (index == -1) {
continue;
List<Publisher<KeyValue<K, V>>> publishers = partitioned.values().stream()
.map(super::mget)
.collect(Collectors.toList());

return Flux.mergeSequential(publishers)
.collectList()
.map(results -> {
KeyValue<K, V>[] values = new KeyValue[keyList.size()];
int offset = 0;

for (List<K> partitionKeys : partitioned.values()) {
for (int i = 0; i < keyList.size(); i++) {
int index = partitionKeys.indexOf(keyList.get(i));
if (index != -1) {
values[i] = results.get(offset + index);
}
}
offset += partitionKeys.size();
}

values[i] = vs.get(offset + index);
}

offset += entry.getValue().size();
}

return Arrays.asList(values);
});

return map.flatMapIterable(keyValues -> keyValues);
return Arrays.asList(values);
})
.flatMapMany(Flux::fromIterable);
}

@Override
Expand Down

0 comments on commit bcb2885

Please sign in to comment.