Skip to content

Commit

Permalink
[Darwin] Add optional concurrent execution to MTRAsyncWorkQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
jtung-apple committed Apr 25, 2024
1 parent 742e65b commit eb723de
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 35 deletions.
11 changes: 11 additions & 0 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ MTR_TESTABLE
/// is lost.
- (instancetype)initWithContext:(ContextType)context;

/// Creates a work queue with the given context object and a queue width.
///
/// The queue will call readyHandler on up to "width" number of work items
/// concurrently. Once "width" number of work items have started, no other
/// work items will get a readyHandler call until one of the running work items
/// has called its completion block with MTRAsyncWorkComplete.
///
/// This allows the a MTRAsyncWorkQueue object to manage a pool of
/// resources that can be use concurrently at any given time.
- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width;

/// Enqueues the specified work item, making it eligible for execution.
///
/// Once a work item is enqueued, ownership of it passes to the queue and
Expand Down
113 changes: 78 additions & 35 deletions src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ @implementation MTRAsyncWorkQueue {
os_unfair_lock _lock;
__weak id _context;
NSMutableArray<MTRAsyncWorkItem *> * _items;
NSInteger _runningWorkItemCount;
NSUInteger _runningWorkItemCount;
NSUInteger _width;
}

// A helper struct that facilitates access to _context while
Expand All @@ -216,11 +217,17 @@ @implementation MTRAsyncWorkQueue {
};

- (instancetype)initWithContext:(id)context
{
return [self initWithContext:context width:1];
}

- (instancetype)initWithContext:(id)context width:(NSUInteger)width
{
NSParameterAssert(context);
if (self = [super init]) {
_context = context;
_items = [NSMutableArray array];
_width = width;
}
return self;
}
Expand Down Expand Up @@ -286,35 +293,84 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem
{
os_unfair_lock_assert_owner(&_lock);

MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil;
if (workItem != runningWorkItem) {
BOOL foundWorkItem = NO;
NSUInteger indexOfWorkItem = 0;
for (NSUInteger i = 0; i < _width; i++) {
if (_items[i] == workItem) {
foundWorkItem = YES;
indexOfWorkItem = i;
break;
}
}
if (!foundWorkItem) {
NSAssert(NO, @"work item to post-process is not running");
return;
}

// already part of the running work items allowed by width - retry directly
if (retry) {
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID);
} else {
[workItem markComplete];
[_items removeObjectAtIndex:0];
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);
[self _callWorkItem:workItem withContext:context];
return;
}

// when "concurrency width" is implemented this will be decremented instead
_runningWorkItemCount = 0;
[workItem markComplete];
[_items removeObjectAtIndex:indexOfWorkItem];
MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID);

// sanity check running work item count is positive
if (_runningWorkItemCount == 0) {
NSAssert(NO, @"running work item count should be positive");
return;
}

_runningWorkItemCount--;
[self _callNextReadyWorkItemWithContext:context];
}

- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);

mtr_weakify(self);
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
mtr_strongify(self);
BOOL handled = NO;
if (self) {
ContextSnapshot context(self); // re-acquire a new snapshot
std::lock_guard lock(self->_lock);
if (!workItem.isComplete) {
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
handled = YES;
}
}
return handled;
}];
}

- (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
{
os_unfair_lock_assert_owner(&_lock);

// when "concurrency width" is implemented this will be checked against the width
if (_runningWorkItemCount) {
return; // can't run next work item until the current one is done
// sanity check not running more than allowed
if (_runningWorkItemCount > _width) {
NSAssert(NO, @"running work item count larger than the maximum width");
return;
}

if (!_items.count) {
// sanity check consistent counts
if (_items.count < _runningWorkItemCount) {
NSAssert(NO, @"work item count is less than running work item count");
return;
}

// can't run more work items if already running at max concurrent width
if (_runningWorkItemCount == _width) {
return;
}

// no more items to run
if (_items.count == _runningWorkItemCount) {
return; // nothing to run
}

Expand All @@ -324,16 +380,16 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
return;
}

// when "concurrency width" is implemented this will be incremented instead
_runningWorkItemCount = 1;

MTRAsyncWorkItem * workItem = _items.firstObject;
NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount;
MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex];
_runningWorkItemCount++;

// Check if batching is possible or needed. Only ask work item to batch once for simplicity
// Check if batching is possible or needed.
auto batchingHandler = workItem.batchingHandler;
if (batchingHandler && workItem.retryCount == 0) {
while (_items.count >= 2) {
MTRAsyncWorkItem * nextWorkItem = _items[1];
if (batchingHandler) {
while (_items.count > _runningWorkItemCount) {
NSUInteger firstNonRunningItemIndex = _runningWorkItemCount;
MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex];
if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) {
goto done; // next item is not eligible to merge with this one
}
Expand All @@ -355,20 +411,7 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context
done:;
}

mtr_weakify(self);
[workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) {
mtr_strongify(self);
BOOL handled = NO;
if (self) {
ContextSnapshot context(self); // re-acquire a new snapshot
std::lock_guard lock(self->_lock);
if (!workItem.isComplete) {
[self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)];
handled = YES;
}
}
return handled;
}];
[self _callWorkItem:workItem withContext:context];
}

- (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
Expand Down
61 changes: 61 additions & 0 deletions src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,65 @@ - (void)testContextLoss
[self waitForExpectationsWithTimeout:1 handler:nil];
}

- (void)testItemsConcurrently
{
MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3];

XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"];
XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"];
__block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT;
__block int beforeSleepCounter = 0;
__block int afterSleepCounter = 0;
__auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
os_unfair_lock_lock(&counterLock);
beforeSleepCounter++;
if (beforeSleepCounter == 3) {
[first3WorkItemsExecutedExpectation fulfill];
}
os_unfair_lock_unlock(&counterLock);
sleep(1);
afterSleepCounter++;
if (afterSleepCounter == 3) {
[first3WorkItemsSleptExpectation fulfill];
}
completion(MTRAsyncWorkComplete);
};

MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem1.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1];

MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem2.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2];

MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItem3.readyHandler = sleep1ReadyHandler;
[workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3];

// This is the item after the first 3, and should onl execute when one of them finished
XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"];
MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)];
workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) {
// expect this to have waited until at least one of the above items finished after sleep() and incremented counter
os_unfair_lock_lock(&counterLock);
if (afterSleepCounter > 0) {
[lastWorkItemWaitedExpectation fulfill];
}
os_unfair_lock_unlock(&counterLock);
completion(MTRAsyncWorkComplete);
};
[workQueue enqueueWorkItem:workItemLast description:@"last work item"];

[self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2];
// the before-sleep counter should have reached 3 immediately as they all run concurrently.
XCTAssertEqual(afterSleepCounter, 0);

[self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2];

// see that all 3 first items ran and slept
XCTAssertEqual(beforeSleepCounter, 3);
XCTAssertEqual(afterSleepCounter, 3);
}

@end

0 comments on commit eb723de

Please sign in to comment.