Skip to content

Commit

Permalink
AttributeCache should cache data versions and use them in subsequent … (
Browse files Browse the repository at this point in the history
#16602)

* AttributeCache should cache data versions and use them in subsequent Read/Subscribe requests

Move DataVersionFilter encoding to the end of read/subscribe request followwing the spec

either using external version filters or using cached data versions if cache is available

* address comments

* address comments

* address comments

* address comments

* address comments
  • Loading branch information
yunhanw-google authored and pull[bot] committed Oct 25, 2023
1 parent c47ea7e commit 2292329
Show file tree
Hide file tree
Showing 19 changed files with 1,039 additions and 149 deletions.
193 changes: 189 additions & 4 deletions src/app/AttributeCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath,
AttributeState state;
System::PacketBufferHandle handle;
System::PacketBufferTLVWriter writer;
bool endpointIsNew = false;

if (mCache.find(aPath.mEndpointId) == mCache.end())
{
//
// Since we might potentially be creating a new entry at mCache[aPath.mEndpointId][aPath.mClusterId] that
// wasn't there before, we need to check if an entry didn't exist there previously and remember that so that
// we can appropriately notify our clients of the addition of a new endpoint.
//
endpointIsNew = true;
}

if (apData)
{
Expand All @@ -46,6 +57,35 @@ CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath,
handle.RightSize();

state.Set<System::PacketBufferHandle>(std::move(handle));

//
// Clear out the committed data version and only set it again once we have received all data for this cluster.
// Otherwise, we may have incomplete data that looks like it's complete since it has a valid data version.
//
mCache[aPath.mEndpointId][aPath.mClusterId].mCommittedDataVersion.ClearValue();

// This commits a pending data version if the last report path is valid and it is different from the current path.
if (mLastReportDataPath.IsValidConcreteClusterPath() && mLastReportDataPath != aPath)
{
CommitPendingDataVersion();
}

bool foundEncompassingWildcardPath = false;
for (const auto & path : mRequestPathSet)
{
if (path.IncludesAllAttributesInCluster(aPath))
{
foundEncompassingWildcardPath = true;
break;
}
}

// if this data item is encompassed by a wildcard path, let's go ahead and update its pending data version.
if (foundEncompassingWildcardPath)
{
mCache[aPath.mEndpointId][aPath.mClusterId].mPendingDataVersion = aPath.mDataVersion;
}
mLastReportDataPath = aPath;
}
else
{
Expand All @@ -56,25 +96,43 @@ CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath,
// if the endpoint didn't exist previously, let's track the insertion
// so that we can inform our callback of a new endpoint being added appropriately.
//
if (mCache.find(aPath.mEndpointId) == mCache.end())
if (endpointIsNew)
{
mAddedEndpoints.push_back(aPath.mEndpointId);
}

mCache[aPath.mEndpointId][aPath.mClusterId][aPath.mAttributeId] = std::move(state);
mCache[aPath.mEndpointId][aPath.mClusterId].mAttributes[aPath.mAttributeId] = std::move(state);
mChangedAttributeSet.insert(aPath);
return CHIP_NO_ERROR;
}

void AttributeCache::OnReportBegin()
{
mLastReportDataPath = ConcreteClusterPath(kInvalidEndpointId, kInvalidClusterId);
mChangedAttributeSet.clear();
mAddedEndpoints.clear();
mCallback.OnReportBegin();
}

void AttributeCache::CommitPendingDataVersion()
{
if (!mLastReportDataPath.IsValidConcreteClusterPath())
{
return;
}

auto & lastClusterInfo = mCache[mLastReportDataPath.mEndpointId][mLastReportDataPath.mClusterId];
if (lastClusterInfo.mPendingDataVersion.HasValue())
{
lastClusterInfo.mCommittedDataVersion = lastClusterInfo.mPendingDataVersion;
lastClusterInfo.mPendingDataVersion.ClearValue();
}
}

void AttributeCache::OnReportEnd()
{
CommitPendingDataVersion();
mLastReportDataPath = ConcreteClusterPath(kInvalidEndpointId, kInvalidClusterId);
std::set<std::tuple<EndpointId, ClusterId>> changedClusters;

//
Expand Down Expand Up @@ -151,6 +209,15 @@ CHIP_ERROR AttributeCache::Get(const ConcreteAttributePath & path, TLV::TLVReade
return CHIP_NO_ERROR;
}

CHIP_ERROR AttributeCache::GetVersion(EndpointId mEndpointId, ClusterId mClusterId, Optional<DataVersion> & aVersion)
{
CHIP_ERROR err;
auto clusterState = GetClusterState(mEndpointId, mClusterId, err);
ReturnErrorOnFailure(err);
aVersion = clusterState->mCommittedDataVersion;
return CHIP_NO_ERROR;
}

AttributeCache::EndpointState * AttributeCache::GetEndpointState(EndpointId endpointId, CHIP_ERROR & err)
{
auto endpointIter = mCache.find(endpointId);
Expand Down Expand Up @@ -192,8 +259,8 @@ AttributeCache::AttributeState * AttributeCache::GetAttributeState(EndpointId en
return nullptr;
}

auto attributeState = clusterState->find(attributeId);
if (attributeState == clusterState->end())
auto attributeState = clusterState->mAttributes.find(attributeId);
if (attributeState == clusterState->mAttributes.end())
{
err = CHIP_ERROR_KEY_NOT_FOUND;
return nullptr;
Expand All @@ -219,5 +286,123 @@ CHIP_ERROR AttributeCache::GetStatus(const ConcreteAttributePath & path, StatusI
return CHIP_NO_ERROR;
}

void AttributeCache::GetSortedFilters(std::vector<std::pair<DataVersionFilter, size_t>> & aVector)
{
for (auto const & endpointIter : mCache)
{
EndpointId endpointId = endpointIter.first;
for (auto const & clusterIter : endpointIter.second)
{
if (!clusterIter.second.mCommittedDataVersion.HasValue())
{
continue;
}
DataVersion dataVersion = clusterIter.second.mCommittedDataVersion.Value();
uint32_t clusterSize = 0;
ClusterId clusterId = clusterIter.first;

for (auto const & attributeIter : clusterIter.second.mAttributes)
{
if (attributeIter.second.Is<StatusIB>())
{
clusterSize +=
5; // 1 byte: anonymous tag control byte for struct. 1 byte: control byte for uint8 value. 1 byte:
// context-specific tag for uint8 value.1 byte: the uint8 value. 1 byte: end of container.
if (attributeIter.second.Get<StatusIB>().mClusterStatus.HasValue())
{
clusterSize += 3; // 1 byte: control byte for uint8 value. 1 byte: context-specific tag for uint8 value. 1
// byte: the uint8 value.
}
}
else
{
System::PacketBufferTLVReader bufReader;
bufReader.Init(attributeIter.second.Get<System::PacketBufferHandle>().Retain());
ReturnOnFailure(bufReader.Next());
// Skip to the end of the element.
ReturnOnFailure(bufReader.Skip());

// Compute the amount of value data
clusterSize += bufReader.GetLengthRead();
}
}
if (clusterSize == 0)
{
continue;
}

DataVersionFilter filter(endpointId, clusterId, dataVersion);

aVector.push_back(std::make_pair(filter, clusterSize));
}
}
std::sort(aVector.begin(), aVector.end(),
[](const std::pair<DataVersionFilter, size_t> & x, const std::pair<DataVersionFilter, size_t> & y) {
return x.second > y.second;
});
}

CHIP_ERROR AttributeCache::OnUpdateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList)
{
CHIP_ERROR err = CHIP_NO_ERROR;
TLV::TLVWriter backup;

for (auto & attribute : aAttributePaths)
{
if (attribute.HasAttributeWildcard())
{
mRequestPathSet.insert(attribute);
}
}

std::vector<std::pair<DataVersionFilter, size_t>> filterVector;
GetSortedFilters(filterVector);

aEncodedDataVersionList = false;
for (auto & filter : filterVector)
{
bool intersected = false;
aDataVersionFilterIBsBuilder.Checkpoint(backup);

// if the particular cached cluster does not intersect with user provided attribute paths, skip the cached one
for (const auto & attributePath : aAttributePaths)
{
if (attributePath.IncludesAttributesInCluster(filter.first))
{
intersected = true;
break;
}
}
if (!intersected)
{
continue;
}

DataVersionFilterIB::Builder & filterIB = aDataVersionFilterIBsBuilder.CreateDataVersionFilter();
SuccessOrExit(err = aDataVersionFilterIBsBuilder.GetError());
ClusterPathIB::Builder & filterPath = filterIB.CreatePath();
SuccessOrExit(err = filterIB.GetError());
SuccessOrExit(
err = filterPath.Endpoint(filter.first.mEndpointId).Cluster(filter.first.mClusterId).EndOfClusterPathIB().GetError());
SuccessOrExit(err = filterIB.DataVersion(filter.first.mDataVersion.Value()).EndOfDataVersionFilterIB().GetError());
ChipLogProgress(DataManagement,
"Update DataVersionFilter: Endpoint=%" PRIu16 " Cluster=" ChipLogFormatMEI " Version=%" PRIu32,
filter.first.mEndpointId, ChipLogValueMEI(filter.first.mClusterId), filter.first.mDataVersion.Value());

aEncodedDataVersionList = true;
}

exit:
if (err == CHIP_ERROR_NO_MEMORY || err == CHIP_ERROR_BUFFER_TOO_SMALL)
{
ChipLogProgress(DataManagement, "OnUpdateDataVersionFilterList out of space; rolling back");
aDataVersionFilterIBsBuilder.Rollback(backup);
err = CHIP_NO_ERROR;
}
return err;
}

} // namespace app
} // namespace chip
67 changes: 56 additions & 11 deletions src/app/AttributeCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

namespace chip {
namespace app {

/*
* This implements an attribute cache designed to aggregate attribute data received by a client
* from either read or subscribe interactions and keep it resident and available for clients to
Expand All @@ -57,7 +56,9 @@ namespace app {
* through to a registered callback. In addition, it provides its own enhancements to the base ReadClient::Callback
* to make it easier to know what has changed in the cache.
*
* **NOTE** This already includes the BufferedReadCallback, so there is no need to add that to the ReadClient callback chain.
* **NOTE**
* 1. This already includes the BufferedReadCallback, so there is no need to add that to the ReadClient callback chain.
* 2. The same cache cannot be used by multiple subscribe/read interactions at the same time.
*
*/
class AttributeCache : protected ReadClient::Callback
Expand Down Expand Up @@ -217,6 +218,14 @@ class AttributeCache : protected ReadClient::Callback
*/
CHIP_ERROR Get(const ConcreteAttributePath & path, TLV::TLVReader & reader);

/*
* Retrieve the data version for the given cluster. If there is no data for the specified path in the cache,
* CHIP_ERROR_KEY_NOT_FOUND shall be returned. Otherwise aVersion will be set to the
* current data version for the cluster (which may have no value if we don't have a known data version
* for it, for example because none of our paths were wildcards that covered the whole cluster).
*/
CHIP_ERROR GetVersion(EndpointId mEndpointId, ClusterId mClusterId, Optional<DataVersion> & aVersion);

/*
* Execute an iterator function that is called for every attribute
* in a given endpoint and cluster. The function when invoked is provided a concrete attribute path
Expand All @@ -241,7 +250,7 @@ class AttributeCache : protected ReadClient::Callback
auto clusterState = GetClusterState(endpointId, clusterId, err);
ReturnErrorOnFailure(err);

for (auto & attributeIter : *clusterState)
for (auto & attributeIter : clusterState->mAttributes)
{
const ConcreteAttributePath path(endpointId, clusterId, attributeIter.first);
ReturnErrorOnFailure(func(path));
Expand Down Expand Up @@ -272,7 +281,7 @@ class AttributeCache : protected ReadClient::Callback
{
if (clusterIter.first == clusterId)
{
for (auto & attributeIter : clusterIter.second)
for (auto & attributeIter : clusterIter.second.mAttributes)
{
const ConcreteAttributePath path(endpointIter.first, clusterId, attributeIter.first);
ReturnErrorOnFailure(func(path));
Expand Down Expand Up @@ -312,10 +321,27 @@ class AttributeCache : protected ReadClient::Callback

private:
using AttributeState = Variant<System::PacketBufferHandle, StatusIB>;
using ClusterState = std::map<AttributeId, AttributeState>;
using EndpointState = std::map<ClusterId, ClusterState>;
using NodeState = std::map<EndpointId, EndpointState>;
// mPendingDataVersion represents a tentative data version for a cluster that we have gotten some reports for.
//
// mCurrentDataVersion represents a known data version for a cluster. In order for this to have a
// value the cluster must be included in a path in mRequestPathSet that has a wildcard attribute
// and we must not be in the middle of receiving reports for that cluster.
struct ClusterState
{
std::map<AttributeId, AttributeState> mAttributes;
Optional<DataVersion> mPendingDataVersion;
Optional<DataVersion> mCommittedDataVersion;
};
using EndpointState = std::map<ClusterId, ClusterState>;
using NodeState = std::map<EndpointId, EndpointState>;

struct Comparator
{
bool operator()(const AttributePathParams & x, const AttributePathParams & y) const
{
return x.mEndpointId < y.mEndpointId || x.mClusterId < y.mClusterId;
}
};
/*
* These functions provide a way to index into the cached state with different sub-sets of a path, returning
* appropriate slices of the data as requested.
Expand Down Expand Up @@ -344,26 +370,45 @@ class AttributeCache : protected ReadClient::Callback
void OnReportBegin() override;
void OnReportEnd() override;
void OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus) override;
void OnError(CHIP_ERROR aError) override { return mCallback.OnError(aError); }
void OnError(CHIP_ERROR aError) override { mCallback.OnError(aError); }

void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override
{
return mCallback.OnEventData(aEventHeader, apData, apStatus);
mCallback.OnEventData(aEventHeader, apData, apStatus);
}

void OnDone() override
{
mRequestPathSet.clear();
return mCallback.OnDone();
}

void OnDone() override { return mCallback.OnDone(); }
void OnSubscriptionEstablished(uint64_t aSubscriptionId) override { mCallback.OnSubscriptionEstablished(aSubscriptionId); }

void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override
{
return mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
mCallback.OnDeallocatePaths(std::move(aReadPrepareParams));
}

virtual CHIP_ERROR OnUpdateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList) override;

// Commit the pending cluster data version, if there is one.
void CommitPendingDataVersion();

// Get our list of data version filters, sorted from larges to smallest by the total size of the TLV
// payload for the filter's cluster. Applying filters in this order should maximize space savings
// on the wire if not all filters can be applied.
void GetSortedFilters(std::vector<std::pair<DataVersionFilter, size_t>> & aVector);

Callback & mCallback;
NodeState mCache;
std::set<ConcreteAttributePath> mChangedAttributeSet;
std::set<AttributePathParams, Comparator> mRequestPathSet; // wildcard attribute request path only
std::vector<EndpointId> mAddedEndpoints;
BufferedReadCallback mBufferedReader;
ConcreteClusterPath mLastReportDataPath = ConcreteClusterPath(kInvalidEndpointId, kInvalidClusterId);
};

}; // namespace app
Expand Down
Loading

0 comments on commit 2292329

Please sign in to comment.