Skip to content

Commit

Permalink
Add IM event processing function and callback interface in client side
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhanw-google committed Nov 16, 2021
1 parent 4d4c4a0 commit 6485f33
Show file tree
Hide file tree
Showing 12 changed files with 418 additions and 54 deletions.
2 changes: 2 additions & 0 deletions src/app/ClusterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ struct ClusterInfo
*/
bool IsValidAttributePath() const { return HasWildcardListIndex() || !HasWildcardAttributeId(); }

bool IsValidEventPath() const { return !HasWildcardEventId(); }

inline bool HasWildcardNodeId() const { return mNodeId == kUndefinedNodeId; }
inline bool HasWildcardEndpointId() const { return mEndpointId == kInvalidEndpointId; }
inline bool HasWildcardClusterId() const { return mClusterId == kInvalidClusterId; }
Expand Down
58 changes: 58 additions & 0 deletions src/app/ConcreteEventPath.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
*
* Copyright (c) 2021 Project CHIP Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <app/util/basic-types.h>

namespace chip {
namespace app {

/**
* A representation of a concrete event path.
*/
struct ConcreteEventPath
{
ConcreteEventPath(EndpointId aEndpointId, ClusterId aClusterId, EventId aEventId) :
mEndpointId(aEndpointId), mClusterId(aClusterId), mEventId(aEventId)
{}

ConcreteEventPath() {}

ConcreteEventPath & operator=(ConcreteEventPath && other)
{
if (&other == this)
return *this;

mEndpointId = other.mEndpointId;
mClusterId = other.mClusterId;
mEventId = other.mEventId;
return *this;
}

bool operator==(const ConcreteEventPath & other) const
{
return mEndpointId == other.mEndpointId && mClusterId == other.mClusterId && mEventId == other.mEventId;
}

EndpointId mEndpointId = 0;
ClusterId mClusterId = 0;
EventId mEventId = 0;
};
} // namespace app
} // namespace chip
4 changes: 3 additions & 1 deletion src/app/DeviceControllerInteractionModelDelegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class DeviceControllerInteractionModelDelegate : public chip::app::ReadClient::C

void OnDone(app::WriteClient * apWriteClient) override {}

void OnEventData(const app::ReadClient * apReadClient, TLV::TLVReader & aEventList) override {}
void OnEventData(const app::ReadClient * apReadClient, const app::EventHeader & aEventHeader, TLV::TLVReader * apData,
const app::StatusIB & aStatus) override
{}

void OnAttributeData(const app::ReadClient * apReadClient, const app::ConcreteAttributePath & aPath, TLV::TLVReader * apData,
const app::StatusIB & aStatus) override
Expand Down
36 changes: 36 additions & 0 deletions src/app/EventHeader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* Copyright (c) 2021 Project CHIP Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "ConcreteEventPath.h"
#include "EventLoggingTypes.h"
#include <app/util/basic-types.h>

namespace chip {
namespace app {
struct EventHeader
{
ConcreteEventPath mPath;
EventNumber mEventNumber = 0;
PriorityLevel mPriorityLevel = PriorityLevel::Invalid;
Timestamp mTimestamp;
Timestamp mDeltaTimestamp;
};
} // namespace app
} // namespace chip
8 changes: 4 additions & 4 deletions src/app/EventLoggingTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,23 @@ struct EventSchema

/**
* @brief
* The struct that provides an application set system or UTC timestamp.
* The struct that provides an application set System or Epoch timestamp.
*/
struct Timestamp
{
enum class Type
{
kInvalid = 0,
kSystem,
kUTC
kEpoch
};
Timestamp() {}
Timestamp(Type aType) : mType(aType) { mValue = 0; }
Timestamp(Type aType, uint64_t aValue) : mType(aType), mValue(aValue) {}
Timestamp(System::Clock::Timestamp aValue) : mType(Type::kSystem), mValue(aValue.count()) {}
static Timestamp UTC(uint64_t aValue)
{
Timestamp timestamp(Type::kUTC, aValue);
Timestamp timestamp(Type::kEpoch, aValue);
return timestamp;
}
static Timestamp System(System::Clock::Timestamp aValue)
Expand Down Expand Up @@ -167,7 +167,7 @@ struct EventLoadOutContext
{
EventLoadOutContext(TLV::TLVWriter & aWriter, PriorityLevel aPriority, EventNumber aStartingEventNumber) :
mWriter(aWriter), mPriority(aPriority), mStartingEventNumber(aStartingEventNumber),
mCurrentSystemTime(Timestamp::Type::kSystem), mCurrentEventNumber(0), mCurrentUTCTime(Timestamp::Type::kUTC), mFirst(true)
mCurrentSystemTime(Timestamp::Type::kSystem), mCurrentEventNumber(0), mCurrentUTCTime(Timestamp::Type::kEpoch), mFirst(true)
{}

TLV::TLVWriter & mWriter;
Expand Down
3 changes: 3 additions & 0 deletions src/app/EventPathParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct EventPathParams
EventPathParams(NodeId aNodeId, EndpointId aEndpointId, ClusterId aClusterId, EventId aEventId, bool aIsUrgent) :
mNodeId(aNodeId), mEndpointId(aEndpointId), mClusterId(aClusterId), mEventId(aEventId), mIsUrgent(aIsUrgent)
{}
EventPathParams(EndpointId aEndpointId, ClusterId aClusterId, EventId aEventId) :
EventPathParams(0, aEndpointId, aClusterId, aEventId, false)
{}
EventPathParams() {}
bool IsSamePath(const EventPathParams & other) const
{
Expand Down
138 changes: 137 additions & 1 deletion src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
{
chip::TLV::TLVReader EventReportsReader;
EventReports.GetReader(&EventReportsReader);
mpCallback->OnEventData(this, EventReportsReader);
err = ProcessEventReportIBs(EventReportsReader);
SuccessOrExit(err);
}

err = report.GetAttributeReportIBs(&attributeReportIBs);
Expand Down Expand Up @@ -470,6 +471,29 @@ CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttribute
return CHIP_NO_ERROR;
}

CHIP_ERROR ReadClient::ProcessEventPath(EventPathIB::Parser & aEventPath, ClusterInfo & aClusterInfo)
{
CHIP_ERROR err = aEventPath.GetNode(&(aClusterInfo.mNodeId));
if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
VerifyOrReturnError(err == CHIP_NO_ERROR, err = CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH);

// The ReportData must contain a concrete event path
err = aEventPath.GetEndpoint(&(aClusterInfo.mEndpointId));
VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_EVENT_PATH);

err = aEventPath.GetCluster(&(aClusterInfo.mClusterId));
VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_EVENT_PATH);

err = aEventPath.GetEvent(&(aClusterInfo.mEventId));
VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_EVENT_PATH);

VerifyOrReturnError(aClusterInfo.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH);
return CHIP_NO_ERROR;
}

CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
{
CHIP_ERROR err = CHIP_NO_ERROR;
Expand Down Expand Up @@ -517,6 +541,118 @@ CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeRepo
return err;
}

CHIP_ERROR ReadClient::ProcessEventTimestamp(EventDataIB::Parser & aEventData, EventHeader & aEventHeader)
{
CHIP_ERROR err = CHIP_NO_ERROR;
uint64_t timeStampVal = 0;
bool hasSystemTimestamp = false;
bool hasEpochTimestamp = false;
bool hasDeltaSystemTimestamp = false;
bool hasDeltaEpochTimestamp = false;
err = aEventData.GetDeltaSystemTimestamp(&timeStampVal);
if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
else if (err == CHIP_NO_ERROR)
{
aEventHeader.mDeltaTimestamp.mType = Timestamp::Type::kSystem;
aEventHeader.mDeltaTimestamp.mValue = timeStampVal;
hasDeltaSystemTimestamp = true;
}

err = aEventData.GetDeltaEpochTimestamp(&timeStampVal);
if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
else if (err == CHIP_NO_ERROR)
{
aEventHeader.mDeltaTimestamp.mType = Timestamp::Type::kEpoch;
aEventHeader.mDeltaTimestamp.mValue = timeStampVal;
hasDeltaEpochTimestamp = true;
}

err = aEventData.GetSystemTimestamp(&timeStampVal);
if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
else if (err == CHIP_NO_ERROR)
{
aEventHeader.mTimestamp.mType = Timestamp::Type::kSystem;
aEventHeader.mTimestamp.mValue = timeStampVal;
hasSystemTimestamp = true;
}

err = aEventData.GetEpochTimestamp(&timeStampVal);
if (err == CHIP_END_OF_TLV)
{
err = CHIP_NO_ERROR;
}
else if (err == CHIP_NO_ERROR)
{
aEventHeader.mDeltaTimestamp.mType = Timestamp::Type::kEpoch;
aEventHeader.mDeltaTimestamp.mValue = timeStampVal;
hasEpochTimestamp = true;
}

if ((hasSystemTimestamp && !hasEpochTimestamp && !hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) ||
(!hasSystemTimestamp && hasEpochTimestamp && !hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) ||
(!hasSystemTimestamp && !hasEpochTimestamp && hasDeltaSystemTimestamp && !hasDeltaEpochTimestamp) ||
(!hasSystemTimestamp && !hasEpochTimestamp && !hasDeltaSystemTimestamp && hasDeltaEpochTimestamp))
{
return CHIP_NO_ERROR;
}
return CHIP_ERROR_IM_MALFORMED_EVENT_DATA_ELEMENT;
}

CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
{
CHIP_ERROR err = CHIP_NO_ERROR;
while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
{
TLV::TLVReader dataReader;
EventReportIB::Parser report;
EventDataIB::Parser data;
EventStatusIB::Parser status;
EventPathIB::Parser path;
ClusterInfo clusterInfo;
StatusIB statusIB;
EventHeader header;
uint8_t priorityLevel = 0;

TLV::TLVReader reader = aEventReportIBsReader;
ReturnErrorOnFailure(report.Init(reader));
// EventStatus and EventData would coexist
err = report.GetEventStatus(&status);
ReturnErrorOnFailure(err);
StatusIB::Parser errorStatus;
ReturnErrorOnFailure(status.GetPath(&path));
ReturnErrorOnFailure(ProcessEventPath(path, clusterInfo));
ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));

ReturnErrorOnFailure(report.GetEventData(&data));
ReturnErrorOnFailure(data.GetPath(&path));
ReturnErrorOnFailure(ProcessEventPath(path, clusterInfo));
header.mPath = ConcreteEventPath(clusterInfo.mEndpointId, clusterInfo.mClusterId, clusterInfo.mEventId);
ReturnErrorOnFailure(data.GetEventNumber(&(header.mEventNumber)));
ReturnErrorOnFailure(data.GetPriority(&priorityLevel));
header.mPriorityLevel = static_cast<PriorityLevel>(priorityLevel);
ReturnErrorOnFailure(ProcessEventTimestamp(data, header));
ReturnErrorOnFailure(data.GetData(&dataReader));

mpCallback->OnEventData(this, header, &dataReader, statusIB);
}

if (CHIP_END_OF_TLV == err)
{
err = CHIP_NO_ERROR;
}
return err;
}

CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
{
CancelLivenessCheckTimer();
Expand Down
27 changes: 20 additions & 7 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#pragma once

#include <app/AttributePathParams.h>
#include <app/EventHeader.h>
#include <app/EventPathParams.h>
#include <app/InteractionModelDelegate.h>
#include <app/MessageDef/ReadRequestMessage.h>
Expand Down Expand Up @@ -59,17 +60,26 @@ class ReadClient : public Messaging::ExchangeDelegate
{
public:
virtual ~Callback() = default;

/**
* Notification that a list of events is received on the given read client.
* OnResponse will be called when a report data response has been received and processed for the given path.
*
* The ReadClient object MUST continue to exist after this call is completed.
*
* @param[in] apReadClient The read client which initialized the read transaction.
* @param[in] aEventReports TLV reader positioned at the list that contains the events. The
* implementation of EventStreamReceived is expected to call Next() on the reader to
* advance it to the first element of the list, then process the elements from beginning to
* the end. The callee is expected to consume all events.
* This callback will be called when:
* - Receiving event data as response of Read interactions
* - Receiving event data as reports of subscriptions
* - Receiving event data as initial reports of subscriptions
*
* @param[in] apReadClient: The read client object that initiated the read or subscribe transaction.
* @param[in] aEventHeader: The event header in report response.
* @param[in] apData: The event data of the given path, will be a nullptr if status is not Success.
* @param[in] aStatus: Event-specific status, containing an InteractionModel::Status code as well as an optional
* cluster-specific status code.
*/
virtual void OnEventData(const ReadClient * apReadClient, TLV::TLVReader & aEventReports) {}
virtual void OnEventData(const ReadClient * apReadClient, const EventHeader & aEventHeader, TLV::TLVReader * apData,
const StatusIB & aStatus)
{}

/**
* OnResponse will be called when a report data response has been received and processed for the given path.
Expand Down Expand Up @@ -243,6 +253,7 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR GenerateAttributePathList(AttributePathIBs::Builder & aAttributePathIBsBuilder,
AttributePathParams * apAttributePathParamsList, size_t aAttributePathParamsListSize);
CHIP_ERROR ProcessAttributeReportIBs(TLV::TLVReader & aAttributeDataIBsReader);
CHIP_ERROR ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader);

void ClearExchangeContext() { mpExchangeCtx = nullptr; }
static void OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState);
Expand All @@ -251,6 +262,8 @@ class ReadClient : public Messaging::ExchangeDelegate
void CancelLivenessCheckTimer();
void MoveToState(const ClientState aTargetState);
CHIP_ERROR ProcessAttributePath(AttributePathIB::Parser & aAttributePath, ClusterInfo & aClusterInfo);
CHIP_ERROR ProcessEventPath(EventPathIB::Parser & aEventPath, ClusterInfo & aClusterInfo);
CHIP_ERROR ProcessEventTimestamp(EventDataIB::Parser & aEventData, EventHeader & aEventHeader);
CHIP_ERROR ProcessReportData(System::PacketBufferHandle && aPayload);
CHIP_ERROR AbortExistingExchangeContext();
const char * GetStateStr() const;
Expand Down
Loading

0 comments on commit 6485f33

Please sign in to comment.