Skip to content

Commit

Permalink
access individualDeletedMessages with thread-safe while filtering out…
Browse files Browse the repository at this point in the history
… replay-message (#45)
  • Loading branch information
rdhabalia authored and merlimat committed Sep 29, 2016
1 parent 0577fda commit 26671f6
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,15 @@ public Set<? extends Position> asyncReplayEntries(final Set<? extends Position>

// filters out messages which are already acknowledged
Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
positions.stream().filter(position -> {
return individualDeletedMessages.contains((PositionImpl) position)
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0;
}).forEach(pos -> alreadyAcknowledgedPositions.add(pos));
lock.readLock().lock();
try {
positions.stream().filter(position -> {
return individualDeletedMessages.contains((PositionImpl) position)
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0;
}).forEach(pos -> alreadyAcknowledgedPositions.add(pos));
} finally {
lock.readLock().unlock();
}

final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
final AtomicReference<ManagedLedgerException> exception = new AtomicReference<>();
Expand Down

0 comments on commit 26671f6

Please sign in to comment.