Skip to content

Commit

Permalink
Fix null reference exception in simple queue cache. (#2829)
Browse files Browse the repository at this point in the history
Simple queue cache was not rate limiting correctly, leading to messages being removed outside the expected cache purge behavior.  This led to null references in cache cursors under load.

Rate limiting fixed..
Code block which removed messages from cache outside the context of the expected purge behavior was removed.
  • Loading branch information
jason-bragg authored and sergeybykov committed Mar 24, 2017
1 parent 684da96 commit ac6f1d3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
15 changes: 15 additions & 0 deletions src/Orleans/Streams/QueueAdapters/DataNotAvailableException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ public DataNotAvailableException(SerializationInfo info, StreamingContext contex
: base(info, context)
{
}
#endif
}

[Serializable]
public class CacheFullException : OrleansException
{
public CacheFullException() : this("Queue message cache is full") { }
public CacheFullException(string message) : base(message) { }
public CacheFullException(string message, Exception inner) : base(message, inner) { }

#if !NETSTANDARD
public CacheFullException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,7 @@ public SimpleQueueCache(int cacheSize, Logger logger)
/// </summary>
public virtual bool IsUnderPressure()
{
if (cachedMessages.Count == 0) return false; // empty cache
if (Size < maxCacheSize) return false; // there is still space in cache
if (cacheCursorHistogram.Count == 0) return false; // no cursors yet - zero consumers basically yet.
// cache is full. Check how many cursors we have in the oldest bucket.
int numCursorsInLastBucket = cacheCursorHistogram[0].NumCurrentCursors;
return numCursorsInLastBucket > 0;
return cacheCursorHistogram.Count >= NUM_CACHE_HISTOGRAM_BUCKETS;
}


Expand Down Expand Up @@ -291,6 +286,8 @@ internal void UnsetCursor(SimpleQueueCacheCursor cursor, StreamSequenceToken tok
private void Add(IBatchContainer batch, StreamSequenceToken sequenceToken)
{
if (batch == null) throw new ArgumentNullException(nameof(batch));
// this should never happen, but just in case
if (Size >= maxCacheSize) throw new CacheFullException();

CacheBucket cacheBucket;
if (cacheCursorHistogram.Count == 0)
Expand Down Expand Up @@ -319,18 +316,6 @@ private void Add(IBatchContainer batch, StreamSequenceToken sequenceToken)

cachedMessages.AddFirst(new LinkedListNode<SimpleQueueCacheItem>(item));
cacheBucket.UpdateNumItems(1);

if (Size > maxCacheSize)
{
//var last = cachedMessages.Last;
cachedMessages.RemoveLast();
var bucket = cacheCursorHistogram[0]; // same as: var bucket = last.Value.CacheBucket;
bucket.UpdateNumItems(-1);
if (bucket.NumCurrentItems == 0)
{
cacheCursorHistogram.RemoveAt(0);
}
}
}

internal static void Log(Logger logger, string format, params object[] args)
Expand Down

0 comments on commit ac6f1d3

Please sign in to comment.