Skip to content

Commit

Permalink
Changed implementation to do per-packet batching
Browse files Browse the repository at this point in the history
  • Loading branch information
jtung-apple committed Sep 21, 2023
1 parent 1f018b5 commit ea199a8
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 30 deletions.
12 changes: 10 additions & 2 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
// Ensure we release the ReadClient before we tear down anything else,
// so it can call our OnDeallocatePaths properly.
mReadClient = nullptr;

// Make sure the block isn't run after object destruction
if (mInterimReportBlock) {
dispatch_block_cancel(mInterimReportBlock);
}
}

chip::app::BufferedReadCallback & GetBufferedCallback() { return mBufferedReadAdapter; }
Expand All @@ -103,9 +108,11 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
// be immediately followed by OnDone and we want to do the deletion there.
void ReportError(CHIP_ERROR aError, bool aCancelSubscription = true);

void ReportAttributes(NSArray * attributeReports);
void ReportCurrentData();

void ReportEvents(NSArray * eventReports);
// Called at attribute/event report time to queue a block to report on the Matter queue so that for multi-packet reports, this
// block is run and reports in batch. No-op if the block is already queued.
void QueueInterimReport();

private:
void OnReportBegin() override;
Expand Down Expand Up @@ -170,6 +177,7 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac
std::unique_ptr<chip::app::ClusterStateCache> mClusterStateCache;
bool mHaveQueuedDeletion = false;
OnDoneHandler _Nullable mOnDoneHandler = nil;
dispatch_block_t mInterimReportBlock = nil;
};

NS_ASSUME_NONNULL_END
45 changes: 34 additions & 11 deletions src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,55 @@
{
__block NSArray * attributeReports = mAttributeReports;
mAttributeReports = nil;
auto attributeCallback = mAttributeReportCallback;

__block NSArray * eventReports = mEventReports;
mEventReports = nil;
auto eventCallback = mEventReportCallback;

ReportAttributes(attributeReports);
ReportEvents(eventReports);
}

void MTRBaseSubscriptionCallback::ReportAttributes(NSArray * attributeReports)
{
auto attributeCallback = mAttributeReportCallback;
if (attributeCallback != nil && attributeReports.count) {
attributeCallback(attributeReports);
}
}

void MTRBaseSubscriptionCallback::ReportEvents(NSArray * eventReports)
{
auto eventCallback = mEventReportCallback;
if (eventCallback != nil && eventReports.count) {
eventCallback(eventReports);
}
}

void MTRBaseSubscriptionCallback::QueueInterimReport()
{
if (mInterimReportBlock) {
return;
}

// __block auto * myself = this;
mInterimReportBlock = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{
mInterimReportBlock = nil;
ReportData();
// Allocate reports arrays to continue accumulation
mAttributeReports = [NSMutableArray new];
mEventReports = [NSMutableArray new];
// myself->ReportCurrentData();
});

dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), mInterimReportBlock);
}

void MTRBaseSubscriptionCallback::ReportCurrentData()
{
mInterimReportBlock = nil;
ReportData();
// Allocate reports arrays to continue accumulation
mAttributeReports = [NSMutableArray new];
mEventReports = [NSMutableArray new];
}

void MTRBaseSubscriptionCallback::OnReportEnd()
{
if (mInterimReportBlock) {
dispatch_block_cancel(mInterimReportBlock);
mInterimReportBlock = nil;
}
ReportData();
if (mReportEndHandler) {
mReportEndHandler();
Expand Down
45 changes: 31 additions & 14 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ @interface MTRDevice ()

@end

@protocol MTRDeviceUnitTestDelegate <MTRDeviceDelegate>
- (void)unitTestReportEndForDevice:(MTRDevice *)device;
@end

@implementation MTRDevice

- (instancetype)initWithNodeID:(NSNumber *)nodeID controller:(MTRDeviceController *)controller
Expand Down Expand Up @@ -429,6 +433,15 @@ - (void)_handleReportEnd
{
os_unfair_lock_lock(&self->_lock);
_estimatedStartTimeFromGeneralDiagnosticsUpTime = nil;
// For unit testing only
#ifdef DEBUG
id delegate = _weakDelegate.strongObject;
if (delegate && [delegate respondsToSelector:@selector(unitTestReportEndForDevice:)]) {
dispatch_async(_delegateQueue, ^{
[delegate unitTestReportEndForDevice:self];
});
}
#endif
os_unfair_lock_unlock(&self->_lock);
}

Expand Down Expand Up @@ -1527,24 +1540,27 @@ - (void)invokeCommandWithEndpointID:(NSNumber *)endpointID
}

MTREventPath * eventPath = [[MTREventPath alloc] initWithPath:aEventHeader.mPath];
NSDictionary * eventReport;
if (apStatus != nullptr) {
[mEventReports addObject:@ { MTREventPathKey : eventPath, MTRErrorKey : [MTRError errorForIMStatus:*apStatus] }];
} else if (apData == nullptr) {
eventReport = @ { MTREventPathKey : eventPath, MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_INVALID_ARGUMENT] };
[mEventReports addObject:@ {
MTREventPathKey : eventPath,
MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_INVALID_ARGUMENT]
}];
} else {
id value = MTRDecodeDataValueDictionaryFromCHIPTLV(apData);
if (value == nil) {
MTR_LOG_ERROR("Failed to decode event data for path %@", eventPath);
eventReport = @ {
[mEventReports addObject:@ {
MTREventPathKey : eventPath,
MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_DECODE_FAILED],
};
}];
} else {
eventReport = [MTRBaseDevice eventReportForHeader:aEventHeader andData:value];
[mEventReports addObject:[MTRBaseDevice eventReportForHeader:aEventHeader andData:value]];
}
}
ReportEvents(@[ eventReport ]);

QueueInterimReport();
}

void SubscriptionCallback::OnAttributeData(
Expand All @@ -1562,25 +1578,26 @@ - (void)invokeCommandWithEndpointID:(NSNumber *)endpointID
}

MTRAttributePath * attributePath = [[MTRAttributePath alloc] initWithPath:aPath];
NSDictionary * attributeReport;
if (aStatus.mStatus != Status::Success) {
attributeReport = @ { MTRAttributePathKey : attributePath, MTRErrorKey : [MTRError errorForIMStatus:aStatus] };
[mAttributeReports addObject:@ { MTRAttributePathKey : attributePath, MTRErrorKey : [MTRError errorForIMStatus:aStatus] }];
} else if (apData == nullptr) {
attributeReport =
@ { MTRAttributePathKey : attributePath, MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_INVALID_ARGUMENT] };
[mAttributeReports addObject:@ {
MTRAttributePathKey : attributePath,
MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_INVALID_ARGUMENT]
}];
} else {
id value = MTRDecodeDataValueDictionaryFromCHIPTLV(apData);
if (value == nil) {
MTR_LOG_ERROR("Failed to decode attribute data for path %@", attributePath);
attributeReport = @ {
[mAttributeReports addObject:@ {
MTRAttributePathKey : attributePath,
MTRErrorKey : [MTRError errorForCHIPErrorCode:CHIP_ERROR_DECODE_FAILED],
};
}];
} else {
attributeReport = @ { MTRAttributePathKey : attributePath, MTRDataKey : value };
[mAttributeReports addObject:@ { MTRAttributePathKey : attributePath, MTRDataKey : value }];
}
}

ReportAttributes(@[ attributeReport ]);
QueueInterimReport();
}
} // anonymous namespace
17 changes: 14 additions & 3 deletions src/darwin/Framework/CHIPTests/MTRDeviceTests.m
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ @interface MTRDeviceTestDelegate : NSObject <MTRDeviceDelegate>
@property (nonatomic, nullable) dispatch_block_t onNotReachable;
@property (nonatomic, nullable) MTRDeviceTestDelegateDataHandler onAttributeDataReceived;
@property (nonatomic, nullable) MTRDeviceTestDelegateDataHandler onEventDataReceived;
@property (nonatomic, nullable) dispatch_block_t onReportEnd;
@end

@implementation MTRDeviceTestDelegate
Expand All @@ -148,6 +149,13 @@ - (void)device:(MTRDevice *)device receivedEventReport:(NSArray<NSDictionary<NSS
}
}

- (void)unitTestReportEndForDevice:(MTRDevice *)device
{
if (self.onReportEnd != nil) {
self.onReportEnd();
}
}

@end

@interface MTRDeviceTests : XCTestCase
Expand Down Expand Up @@ -1457,6 +1465,8 @@ - (void)test017_TestMTRDeviceBasics
XCTAssertNotNil(eventDict[MTREventTimestampDateKey]);
}
}
};
delegate.onReportEnd = ^() {
[gotReportsExpectation fulfill];
};

Expand Down Expand Up @@ -1490,12 +1500,11 @@ - (void)test017_TestMTRDeviceBasics

[self waitForExpectations:@[ subscriptionExpectation, gotReportsExpectation ] timeout:60];

delegate.onReportEnd = nil;

XCTAssertNotEqual(attributeReportsReceived, 0);
XCTAssertNotEqual(eventReportsReceived, 0);

attributeReportsReceived = 0;
eventReportsReceived = 0;

// Before resubscribe, first test write failure and expected value effects
NSNumber * testEndpointID = @(1);
NSNumber * testClusterID = @(8);
Expand Down Expand Up @@ -1555,6 +1564,8 @@ - (void)test017_TestMTRDeviceBasics
};

// reset the onAttributeDataReceived to validate the following resubscribe test
attributeReportsReceived = 0;
eventReportsReceived = 0;
delegate.onAttributeDataReceived = ^(NSArray<NSDictionary<NSString *, id> *> * data) {
attributeReportsReceived += data.count;
};
Expand Down

0 comments on commit ea199a8

Please sign in to comment.