diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h index b584e30738740c..598bbf00f5e690 100644 --- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h +++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.h @@ -192,6 +192,17 @@ MTR_TESTABLE /// is lost. - (instancetype)initWithContext:(ContextType)context; +/// Creates a work queue with the given context object and a queue width. +/// +/// The queue will call readyHandler on up to "width" number of work items +/// concurrently. Once "width" number of work items have started, no other +/// work items will get a readyHandler call until one of the running work items +/// has called its completion block with MTRAsyncWorkComplete. +/// +/// This allows the a MTRAsyncWorkQueue object to manage a pool of +/// resources that can be use concurrently at any given time. +- (instancetype)initWithContext:(ContextType)context width:(NSUInteger)width; + /// Enqueues the specified work item, making it eligible for execution. /// /// Once a work item is enqueued, ownership of it passes to the queue and diff --git a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm index eff85b1b0cb503..53a719502306bd 100644 --- a/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm +++ b/src/darwin/Framework/CHIP/MTRAsyncWorkQueue.mm @@ -197,7 +197,8 @@ @implementation MTRAsyncWorkQueue { os_unfair_lock _lock; __weak id _context; NSMutableArray * _items; - NSInteger _runningWorkItemCount; + NSUInteger _runningWorkItemCount; + NSUInteger _width; } // A helper struct that facilitates access to _context while @@ -216,11 +217,17 @@ @implementation MTRAsyncWorkQueue { }; - (instancetype)initWithContext:(id)context +{ + return [self initWithContext:context width:1]; +} + +- (instancetype)initWithContext:(id)context width:(NSUInteger)width { NSParameterAssert(context); if (self = [super init]) { _context = context; _items = [NSMutableArray array]; + _width = width; } return self; } @@ -286,35 +293,84 @@ - (void)_postProcessWorkItem:(MTRAsyncWorkItem *)workItem { os_unfair_lock_assert_owner(&_lock); - MTRAsyncWorkItem * runningWorkItem = (_runningWorkItemCount) ? _items.firstObject : nil; - if (workItem != runningWorkItem) { + BOOL foundWorkItem = NO; + NSUInteger indexOfWorkItem = 0; + for (NSUInteger i = 0; i < _width; i++) { + if (_items[i] == workItem) { + foundWorkItem = YES; + indexOfWorkItem = i; + break; + } + } + if (!foundWorkItem) { NSAssert(NO, @"work item to post-process is not running"); return; } + // already part of the running work items allowed by width - retry directly if (retry) { MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@> retry needed for work item [%llu]", context.description, workItem.uniqueID); - } else { - [workItem markComplete]; - [_items removeObjectAtIndex:0]; - MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID); + [self _callWorkItem:workItem withContext:context]; + return; } - // when "concurrency width" is implemented this will be decremented instead - _runningWorkItemCount = 0; + [workItem markComplete]; + [_items removeObjectAtIndex:indexOfWorkItem]; + MTR_LOG_DEFAULT("MTRAsyncWorkQueue<%@, items count: %tu> completed work item [%llu]", context.description, _items.count, workItem.uniqueID); + + // sanity check running work item count is positive + if (_runningWorkItemCount == 0) { + NSAssert(NO, @"running work item count should be positive"); + return; + } + + _runningWorkItemCount--; [self _callNextReadyWorkItemWithContext:context]; } +- (void)_callWorkItem:(MTRAsyncWorkItem *)workItem withContext:(ContextSnapshot const &)context +{ + os_unfair_lock_assert_owner(&_lock); + + mtr_weakify(self); + [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) { + mtr_strongify(self); + BOOL handled = NO; + if (self) { + ContextSnapshot context(self); // re-acquire a new snapshot + std::lock_guard lock(self->_lock); + if (!workItem.isComplete) { + [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)]; + handled = YES; + } + } + return handled; + }]; +} + - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context { os_unfair_lock_assert_owner(&_lock); - // when "concurrency width" is implemented this will be checked against the width - if (_runningWorkItemCount) { - return; // can't run next work item until the current one is done + // sanity check not running more than allowed + if (_runningWorkItemCount > _width) { + NSAssert(NO, @"running work item count larger than the maximum width"); + return; } - if (!_items.count) { + // sanity check consistent counts + if (_items.count < _runningWorkItemCount) { + NSAssert(NO, @"work item count is less than running work item count"); + return; + } + + // can't run more work items if already running at max concurrent width + if (_runningWorkItemCount == _width) { + return; + } + + // no more items to run + if (_items.count == _runningWorkItemCount) { return; // nothing to run } @@ -324,16 +380,16 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context return; } - // when "concurrency width" is implemented this will be incremented instead - _runningWorkItemCount = 1; - - MTRAsyncWorkItem * workItem = _items.firstObject; + NSUInteger nextWorkItemToRunIndex = _runningWorkItemCount; + MTRAsyncWorkItem * workItem = _items[nextWorkItemToRunIndex]; + _runningWorkItemCount++; - // Check if batching is possible or needed. Only ask work item to batch once for simplicity + // Check if batching is possible or needed. auto batchingHandler = workItem.batchingHandler; - if (batchingHandler && workItem.retryCount == 0) { - while (_items.count >= 2) { - MTRAsyncWorkItem * nextWorkItem = _items[1]; + if (batchingHandler) { + while (_items.count > _runningWorkItemCount) { + NSUInteger firstNonRunningItemIndex = _runningWorkItemCount; + MTRAsyncWorkItem * nextWorkItem = _items[firstNonRunningItemIndex]; if (!nextWorkItem.batchingHandler || nextWorkItem.batchingID != workItem.batchingID) { goto done; // next item is not eligible to merge with this one } @@ -355,20 +411,7 @@ - (void)_callNextReadyWorkItemWithContext:(ContextSnapshot const &)context done:; } - mtr_weakify(self); - [workItem callReadyHandlerWithContext:context.reference completion:^(MTRAsyncWorkOutcome outcome) { - mtr_strongify(self); - BOOL handled = NO; - if (self) { - ContextSnapshot context(self); // re-acquire a new snapshot - std::lock_guard lock(self->_lock); - if (!workItem.isComplete) { - [self _postProcessWorkItem:workItem context:context retry:(outcome == MTRAsyncWorkNeedsRetry)]; - handled = YES; - } - } - return handled; - }]; + [self _callWorkItem:workItem withContext:context]; } - (BOOL)hasDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData diff --git a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m index 031c45380496f0..874251cbaf37bd 100644 --- a/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m +++ b/src/darwin/Framework/CHIPTests/MTRAsyncWorkQueueTests.m @@ -491,4 +491,65 @@ - (void)testContextLoss [self waitForExpectationsWithTimeout:1 handler:nil]; } +- (void)testItemsConcurrently +{ + MTRAsyncWorkQueue * workQueue = [[MTRAsyncWorkQueue alloc] initWithContext:NSNull.null width:3]; + + XCTestExpectation * first3WorkItemsExecutedExpectation = [self expectationWithDescription:@"First 3 work items executed"]; + XCTestExpectation * first3WorkItemsSleptExpectation = [self expectationWithDescription:@"First 3 work items slept"]; + __block os_unfair_lock counterLock = OS_UNFAIR_LOCK_INIT; + __block int beforeSleepCounter = 0; + __block int afterSleepCounter = 0; + __auto_type sleep1ReadyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) { + os_unfair_lock_lock(&counterLock); + beforeSleepCounter++; + if (beforeSleepCounter == 3) { + [first3WorkItemsExecutedExpectation fulfill]; + } + os_unfair_lock_unlock(&counterLock); + sleep(1); + afterSleepCounter++; + if (afterSleepCounter == 3) { + [first3WorkItemsSleptExpectation fulfill]; + } + completion(MTRAsyncWorkComplete); + }; + + MTRAsyncWorkItem * workItem1 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)]; + workItem1.readyHandler = sleep1ReadyHandler; + [workQueue enqueueWorkItem:workItem1 descriptionWithFormat:@"work item %d", 1]; + + MTRAsyncWorkItem * workItem2 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)]; + workItem2.readyHandler = sleep1ReadyHandler; + [workQueue enqueueWorkItem:workItem2 descriptionWithFormat:@"work item %d", 2]; + + MTRAsyncWorkItem * workItem3 = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)]; + workItem3.readyHandler = sleep1ReadyHandler; + [workQueue enqueueWorkItem:workItem3 descriptionWithFormat:@"work item %d", 3]; + + // This is the item after the first 3, and should onl execute when one of them finished + XCTestExpectation * lastWorkItemWaitedExpectation = [self expectationWithDescription:@"Last work item waited properly"]; + MTRAsyncWorkItem * workItemLast = [[MTRAsyncWorkItem alloc] initWithQueue:dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)]; + workItemLast.readyHandler = ^(id context, NSInteger retryCount, MTRAsyncWorkCompletionBlock completion) { + // expect this to have waited until at least one of the above items finished after sleep() and incremented counter + os_unfair_lock_lock(&counterLock); + if (afterSleepCounter > 0) { + [lastWorkItemWaitedExpectation fulfill]; + } + os_unfair_lock_unlock(&counterLock); + completion(MTRAsyncWorkComplete); + }; + [workQueue enqueueWorkItem:workItemLast description:@"last work item"]; + + [self waitForExpectations:@[ first3WorkItemsExecutedExpectation ] timeout:2]; + // the before-sleep counter should have reached 3 immediately as they all run concurrently. + XCTAssertEqual(afterSleepCounter, 0); + + [self waitForExpectations:@[ lastWorkItemWaitedExpectation, first3WorkItemsSleptExpectation ] timeout:2]; + + // see that all 3 first items ran and slept + XCTAssertEqual(beforeSleepCounter, 3); + XCTAssertEqual(afterSleepCounter, 3); +} + @end