Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Darwin] MTRDevice should coalesce reads and avoid duplicates #26999

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@ NS_ASSUME_NONNULL_BEGIN

typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);

// The batching handler is called by the work queue when all of the following are true:
//
// 1) A work item that is batchable is about to be dequeued and executed for the first time.
// 2) The next work item in the queue is also batchable.
// 3) The two work items have matching batching ids.
//
// The handler will be passed the opaque data of the two work items: opaqueDataCurrent is the data of the
// item about to be executed and opaqueDataNext is the data for the next item.
//
// The handler is expected to mutate the data as needed to achieve batching.
//
// If after the data mutations opaqueDataNext no longer requires any work, the handler
// should set *fullyMerged to YES to indicate that the next item can be dropped from the queue.
// Otherwise the handler should set *fullyMerged to NO.
//
// If *fullyMerged is set to YES, this handler may be called again to possibly also batch the work item
// after the one that was dropped.
typedef void (^MTRAsyncCallbackBatchingHandler)(id opaqueDataCurrent, id opaqueDataNext, BOOL * fullyMerged);

// The duplicate check handler is called by the work queue when the client wishes to check whether a work item is a duplicate of an
// existing one, so that the client can decide to not enqueue the new duplicate.
typedef void (^MTRAsyncCallbackDuplicateCheckHandler)(id opaqueItemData, BOOL * isDuplicate);
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved

// MTRAsyncCallbackQueue high level description
// The MTRAsyncCallbackQueue was made to call one readyHandler
// block at a time asynchronously, and the readyHandler is
Expand All @@ -42,6 +65,23 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
// - Set the readyHandler block on the WorkItem object
// - Call enqueueWorkItem on a MTRAsyncCallbackQueue

// Optional feature: Work Item Batching
// When a work item is dequeued to run, if it is of a type that can be combined with similar work items in a batch, this facility
// gives the client of this API an opportunity to coalesce and merge work items.
// - The "batching ID" is used for grouping mergeable work items with unique merging strategies. The ID value is opaque to this
// API, and the API client is responsible for assigning them.
// - Each work item will only be asked to batch before it's first dequeued to run readyHandler.
// See the MTRAsyncCallbackBatchingHandler definition above and the WorkItem's -setBatchingID:data:handler: method description for
// more details.

// Optional feature: Duplicate Filtering
// This is a facility that enables the API client to check if a potential work item has already been enqueued. By providing a
// handler that can answer if a work item's relevant data is a duplicate, it can avoid redundant queuing of requests.
// - The "duplicate type ID" is used for grouping different types of work items for duplicate checking. The ID value is opaque
// to this API, and the API client is responsible for assigning them.
// See the MTRAsyncCallbackDuplicateCheckHandler definition above and the WorkItem's -setDuplicateTypeID:handler: method description
// for more details.

// A serial one-at-a-time queue for performing work items
@interface MTRAsyncCallbackWorkQueue : NSObject
- (instancetype)init NS_UNAVAILABLE;
Expand All @@ -57,7 +97,12 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
// Note: Once a work item is enqueued, its handlers cannot be modified
- (void)enqueueWorkItem:(MTRAsyncCallbackQueueWorkItem *)item;

// TODO: Add a "set concurrency width" method to allow for more than 1 work item at a time
// Before creating a work item, a client may call this method to check with existing work items that the new potential work item
// data is not a duplicate request. The work queue will then look for all work items matching the duplicate type ID, and call their
// duplicateCheckHandler with the provided opaqueWorkItemData.
//
// Returns YES if any item's duplicateCheckHandler returns a match.
- (BOOL)isDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData;
@end

// An item in the work queue
Expand All @@ -70,6 +115,18 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
@property (nonatomic, strong) MTRAsyncCallbackReadyHandler readyHandler;
@property (nonatomic, strong) dispatch_block_t cancelHandler;

// For work items that can be merged into a batch, set this handler with an identifier and an object that represents the mergeable
// data. When the work queue processes a batchable item, if the next item is also batchable with the same batching identifier, the
// work queue will call the batchingHandler to give the work item an opportunity to merge the data before readyHandler is called.
// Should the two items be completely merged into one batch, the batchingHandler can signal that through the out argument
// "fullyMerged", and the work queue will remove the second item.
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
- (void)setBatchingID:(NSUInteger)opaqueBatchingID
data:(id)opaqueBatchableData
handler:(MTRAsyncCallbackBatchingHandler)batchingHandler;

// For work items that may have duplicates, set this handler with an identifier and a handler
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
- (void)setDuplicateTypeID:(NSUInteger)opaqueDuplicateTypeID handler:(MTRAsyncCallbackDuplicateCheckHandler)duplicateCheckHandler;

// Called by the creater of the work item when async work is done and should
// be removed from the queue. The work queue will run the next work item.
// Note: This must only be called from within the readyHandler
Expand Down
62 changes: 61 additions & 1 deletion src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#import <dispatch/dispatch.h>
#import <os/lock.h>

#import "MTRAsyncCallbackWorkQueue.h"
#import "MTRAsyncCallbackWorkQueue_Internal.h"
#import "MTRLogging_Internal.h"

#pragma mark - Class extensions
Expand Down Expand Up @@ -169,9 +169,49 @@ - (void)_callNextReadyWorkItem
self.runningWorkItemCount = 1;

MTRAsyncCallbackQueueWorkItem * workItem = self.items.firstObject;

// Check if batching is possible or needed. Only ask work item to batch once for simplicity
if (workItem.batchable && workItem.batchingHandler && (workItem.retryCount == 0)) {
while (self.items.count >= 2) {
MTRAsyncCallbackQueueWorkItem * nextWorkItem = self.items[1];
if (!nextWorkItem.batchable || (nextWorkItem.batchingID != workItem.batchingID)) {
// next item is not eligible to merge with this one
break;
}

BOOL fullyMerged = NO;
workItem.batchingHandler(workItem.batchableData, nextWorkItem.batchableData, &fullyMerged);
if (!fullyMerged) {
// if some parts of the next item is
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
break;
}

[self.items removeObjectAtIndex:1];
}
}

[workItem callReadyHandlerWithContext:self.context];
}
}

- (BOOL)isDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
{
os_unfair_lock_lock(&_lock);
int i = 0;
for (MTRAsyncCallbackQueueWorkItem * item in self.items) {
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
BOOL isDuplicate = NO;
if (item.supportsDuplicateCheck && (item.duplicateTypeID == opaqueDuplicateTypeID) && item.duplicateCheckHandler) {
item.duplicateCheckHandler(opaqueWorkItemData, &isDuplicate);
if (isDuplicate) {
os_unfair_lock_unlock(&_lock);
return YES;
}
}
i++;
}
os_unfair_lock_unlock(&_lock);
return NO;
}
@end

@implementation MTRAsyncCallbackQueueWorkItem
Expand Down Expand Up @@ -277,4 +317,24 @@ - (void)cancel
});
}
}

- (void)setBatchingID:(NSUInteger)opaqueBatchingID
data:(id)opaqueBatchableData
handler:(MTRAsyncCallbackBatchingHandler)batchingHandler
{
os_unfair_lock_lock(&self->_lock);
_batchable = YES;
_batchingID = opaqueBatchingID;
_batchableData = opaqueBatchableData;
_batchingHandler = batchingHandler;
os_unfair_lock_unlock(&self->_lock);
}

- (void)setDuplicateTypeID:(NSUInteger)opaqueDuplicateTypeID handler:(MTRAsyncCallbackDuplicateCheckHandler)duplicateCheckHandler
{
_supportsDuplicateCheck = YES;
_duplicateTypeID = opaqueDuplicateTypeID;
_duplicateCheckHandler = duplicateCheckHandler;
}

@end
13 changes: 13 additions & 0 deletions src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue_Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,17 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithContext:(id _Nullable)context queue:(dispatch_queue_t)queue;
@end

@interface MTRAsyncCallbackQueueWorkItem ()
// Batching
@property (nonatomic, readonly) BOOL batchable;
@property (nonatomic, readonly) NSUInteger batchingID;
@property (nonatomic, readonly) id batchableData;
@property (nonatomic, readonly) MTRAsyncCallbackBatchingHandler batchingHandler;

// Duplicate filter
@property (nonatomic, readonly) BOOL supportsDuplicateCheck;
@property (nonatomic, readonly) NSUInteger duplicateTypeID;
@property (nonatomic, readonly) MTRAsyncCallbackDuplicateCheckHandler duplicateCheckHandler;
@end

NS_ASSUME_NONNULL_END
151 changes: 127 additions & 24 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ typedef NS_ENUM(NSUInteger, MTRDeviceExpectedValueFieldIndex) {
MTRDeviceExpectedValueFieldIDIndex = 2
};

typedef NS_ENUM(NSUInteger, MTRDeviceReadRequestFieldIndex) {
MTRDeviceReadRequestFieldEndpointIDIndex = 0,
MTRDeviceReadRequestFieldClusterIDIndex = 1,
MTRDeviceReadRequestFieldAttributeIDIndex = 2,
MTRDeviceReadRequestFieldParamsIndex = 3
};

typedef NS_ENUM(NSUInteger, MTRDeviceWorkItemBatchingID) {
MTRDeviceWorkItemBatchingReadID = 1,
bzbarsky-apple marked this conversation as resolved.
Show resolved Hide resolved
};

typedef NS_ENUM(NSUInteger, MTRDeviceWorkItemDuplicateTypeID) {
MTRDeviceWorkItemDuplicateReadTypeID = 1,
};

@interface MTRDevice ()
@property (nonatomic, readonly) os_unfair_lock lock; // protects the caches and device state
@property (nonatomic) chip::FabricIndex fabricIndex;
Expand Down Expand Up @@ -775,37 +790,125 @@ static BOOL AttributeHasChangesOmittedQuality(MTRAttributePath * attributePath)
// 4. Cache has no entry
// TODO: add option for BaseSubscriptionCallback to report during priming, to reduce when case 4 is hit
if (!attributeIsSpecified || ![self _subscriptionAbleToReport] || hasChangesOmittedQuality || !attributeValueToReturn) {
// Read requests container will be a mutable array of items, each being an array containing:
// [endpoint ID, cluster ID, attribute ID, params]
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
// Batching handler should only coalesce when params are equal.

// For this single read API there's only 1 array item. Use NSNull to stand in for nil params for easy comparison.
NSArray * readRequestData = @[ endpointID, clusterID, attributeID, params ?: [NSNull null] ];
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved

// But first, check if a duplicate read request is already queued and return
if ([_asyncCallbackWorkQueue isDuplicateForTypeID:MTRDeviceWorkItemDuplicateReadTypeID workItemData:readRequestData]) {
return attributeValueToReturn;
}

NSMutableArray<NSArray *> * readRequests = [NSMutableArray arrayWithObject:readRequestData];

// Create work item, set ready handler to perform task, then enqueue the work
MTRAsyncCallbackQueueWorkItem * workItem = [[MTRAsyncCallbackQueueWorkItem alloc] initWithQueue:self.queue];
MTRAsyncCallbackBatchingHandler batchingHandler = ^(id opaqueDataCurrent, id opaqueDataNext, BOOL * fullyMerged) {
NSMutableArray<NSArray *> * readRequestsCurrent = opaqueDataCurrent;
NSMutableArray<NSArray *> * readRequestsNext = opaqueDataNext;

*fullyMerged = NO;

// Can only read up to 9 paths at a time, per spec
if (readRequestsCurrent.count >= 9) {
MTR_LOG_DEFAULT("%@ batching cannot add more", logPrefix);
return;
}

while (readRequestsNext.count) {
// if params don't match then they cannot be merged
if (![readRequestsNext[0][MTRDeviceReadRequestFieldParamsIndex]
isEqual:readRequestsCurrent[0][MTRDeviceReadRequestFieldParamsIndex]]) {
MTR_LOG_DEFAULT("%@ batching merged all possible items", logPrefix);
return;
}

// merge the next item's first request into the current item's list
[readRequestsCurrent addObject:readRequestsNext[0]];
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
MTR_LOG_INFO("%@ batching merging %@ => %lu total", logPrefix, readRequestsNext[0],
(unsigned long) readRequestsCurrent.count);
[readRequestsNext removeObjectAtIndex:0];

// Can only read up to 9 paths at a time, per spec
if (readRequestsCurrent.count == 9) {
MTR_LOG_DEFAULT("%@ batching to max paths allowed", logPrefix);
break;
}
}

if (readRequestsNext.count == 0) {
MTR_LOG_DEFAULT("%@ batching - fully merged next item %@", logPrefix, fullyMerged ? @"YES" : @"NO");
jtung-apple marked this conversation as resolved.
Show resolved Hide resolved
*fullyMerged = YES;
}
};
MTRAsyncCallbackDuplicateCheckHandler duplicateCheckHandler = ^(id opaqueItemData, BOOL * isDuplicate) {
*isDuplicate = NO;
for (NSArray * readItem in readRequests) {
if ([readItem isEqual:opaqueItemData]) {
MTR_LOG_DEFAULT("%@ duplicate check found %@", logPrefix, readItem);
*isDuplicate = YES;
return;
}
}
};
MTRAsyncCallbackReadyHandler readyHandler = ^(MTRDevice * device, NSUInteger retryCount) {
MTR_LOG_DEFAULT("%@ dequeueWorkItem %@", logPrefix, self->_asyncCallbackWorkQueue);

// Sanity check
if (readRequests.count == 0) {
MTR_LOG_ERROR("%@ dequeueWorkItem no read requests", logPrefix);
[workItem endWork];
return;
}

// Build the attribute paths from the read requests
NSMutableArray * attributePaths = [NSMutableArray array];
for (NSArray * readItem in readRequests) {
// Sanity check
if (readItem.count < 4) {
MTR_LOG_ERROR("%@ dequeueWorkItem read item missing info %@", logPrefix, readItem);
[workItem endWork];
return;
}
[attributePaths addObject:[MTRAttributeRequestPath
requestPathWithEndpointID:readItem[MTRDeviceReadRequestFieldEndpointIDIndex]
clusterID:readItem[MTRDeviceReadRequestFieldClusterIDIndex]
attributeID:readItem[MTRDeviceReadRequestFieldAttributeIDIndex]]];
}
// If param is the NSNull stand-in, then just use nil
id readParamObject = readRequests[0][MTRDeviceReadRequestFieldParamsIndex];
MTRReadParams * readParams = (![readParamObject isEqual:[NSNull null]]) ? readParamObject : nil;

MTRBaseDevice * baseDevice = [self newBaseDevice];
[baseDevice readAttributesWithEndpointID:endpointID
clusterID:clusterID
attributeID:attributeID
params:params
queue:self.queue
completion:^(NSArray<NSDictionary<NSString *, id> *> * _Nullable values,
NSError * _Nullable error) {
if (values) {
// Since the format is the same data-value dictionary, this looks like an
// attribute report
MTR_LOG_INFO("%@ completion values %@", logPrefix, values);
[self _handleAttributeReport:values];
}

// TODO: better retry logic
if (error && (retryCount < 2)) {
MTR_LOG_ERROR("%@ completion error %@ retryWork %lu", logPrefix, error,
(unsigned long) retryCount);
[workItem retryWork];
} else {
MTR_LOG_DEFAULT("%@ completion error %@ endWork", logPrefix, error);
[workItem endWork];
}
}];
[baseDevice
readAttributePaths:attributePaths
eventPaths:nil
params:readParams
queue:self.queue
completion:^(NSArray<NSDictionary<NSString *, id> *> * _Nullable values, NSError * _Nullable error) {
if (values) {
// Since the format is the same data-value dictionary, this looks like an
// attribute report
MTR_LOG_INFO("%@ completion values %@", logPrefix, values);
[self _handleAttributeReport:values];
}

// TODO: better retry logic
if (error && (retryCount < 2)) {
MTR_LOG_ERROR("%@ completion error %@ retryWork %lu", logPrefix, error, (unsigned long) retryCount);
[workItem retryWork];
} else {
MTR_LOG_DEFAULT("%@ completion error %@ endWork", logPrefix, error);
[workItem endWork];
}
}];
};
workItem.readyHandler = readyHandler;
[workItem setBatchingID:MTRDeviceWorkItemBatchingReadID data:readRequests handler:batchingHandler];
[workItem setDuplicateTypeID:MTRDeviceWorkItemDuplicateReadTypeID handler:duplicateCheckHandler];
MTR_LOG_DEFAULT("%@ enqueueWorkItem %@", logPrefix, _asyncCallbackWorkQueue);
[_asyncCallbackWorkQueue enqueueWorkItem:workItem];
}
Expand Down
Loading