Skip to content

Commit

Permalink
Add DeviceSubscriptionManager to manage subscription of fabric-admin (#…
Browse files Browse the repository at this point in the history
…35305)

---------

Co-authored-by: Restyled.io <[email protected]>
Co-authored-by: Andrei Litvin <[email protected]>
Co-authored-by: saurabhst <[email protected]>
  • Loading branch information
4 people authored and pull[bot] committed Oct 18, 2024
1 parent 485ef55 commit 2708529
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 28 deletions.
2 changes: 2 additions & 0 deletions examples/fabric-admin/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ static_library("fabric-admin-utils") {
"device_manager/DeviceManager.h",
"device_manager/DeviceSubscription.cpp",
"device_manager/DeviceSubscription.h",
"device_manager/DeviceSubscriptionManager.cpp",
"device_manager/DeviceSubscriptionManager.h",
"device_manager/DeviceSynchronization.cpp",
"device_manager/DeviceSynchronization.h",
]
Expand Down
6 changes: 5 additions & 1 deletion examples/fabric-admin/commands/pairing/PairingCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ void PairingCommand::OnCommissioningComplete(NodeId nodeId, CHIP_ERROR err)
{
// print to console
fprintf(stderr, "New device with Node ID: 0x%lx has been successfully added.\n", nodeId);
DeviceSynchronizer::Instance().StartDeviceSynchronization(CurrentCommissioner(), mNodeId, mDeviceIsICD);
// CurrentCommissioner() has a lifetime that is the entire life of the application itself
// so it is safe to provide to StartDeviceSynchronization.
DeviceSynchronizer::Instance().StartDeviceSynchronization(&CurrentCommissioner(), mNodeId, mDeviceIsICD);
}
else
{
Expand Down Expand Up @@ -564,6 +566,8 @@ void PairingCommand::OnCurrentFabricRemove(void * context, NodeId nodeId, CHIP_E
fprintf(stderr, "Device with Node ID: 0x%lx has been successfully removed.\n", nodeId);

#if defined(PW_RPC_ENABLED)
chip::app::InteractionModelEngine::GetInstance()->ShutdownSubscriptions(command->CurrentCommissioner().GetFabricIndex(),
nodeId);
RemoveSynchronizedDevice(nodeId);
#endif
}
Expand Down
100 changes: 91 additions & 9 deletions examples/fabric-admin/device_manager/DeviceSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,18 @@ void DeviceSubscription::OnReportEnd()
#if defined(PW_RPC_ENABLED)
AdminCommissioningAttributeChanged(mCurrentAdministratorCommissioningAttributes);
#else
ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled");
ChipLogError(NotSpecified, "Cannot forward Administrator Commissioning Attribute to fabric bridge: RPC not enabled");
#endif
mChangeDetected = false;
}
}

void DeviceSubscription::OnDone(ReadClient * apReadClient)
{
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
// DeviceSubscription.
MoveToState(State::AwaitingDestruction);
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}

void DeviceSubscription::OnError(CHIP_ERROR error)
Expand All @@ -118,6 +121,15 @@ void DeviceSubscription::OnError(CHIP_ERROR error)

void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchangeMgr, const SessionHandle & sessionHandle)
{
if (mState == State::Stopping)
{
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
// DeviceSubscription.
MoveToState(State::AwaitingDestruction);
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
return;
}
VerifyOrDie(mState == State::Connecting);
mClient = std::make_unique<ReadClient>(app::InteractionModelEngine::GetInstance(), &exchangeMgr /* echangeMgr */,
*this /* callback */, ReadClient::InteractionType::Subscribe);
VerifyOrDie(mClient);
Expand All @@ -136,25 +148,95 @@ void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchange
if (err != CHIP_NO_ERROR)
{
ChipLogError(NotSpecified, "Failed to issue subscription to AdministratorCommissioning data");
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
// DeviceSubscription.
MoveToState(State::AwaitingDestruction);
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
return;
}
MoveToState(State::SubscriptionStarted);
}

void DeviceSubscription::MoveToState(const State aTargetState)
{
mState = aTargetState;
ChipLogDetail(NotSpecified, "DeviceSubscription moving to [%10.10s]", GetStateStr());
}

const char * DeviceSubscription::GetStateStr() const
{
switch (mState)
{
case State::Idle:
return "Idle";

case State::Connecting:
return "Connecting";

case State::Stopping:
return "Stopping";

case State::SubscriptionStarted:
return "SubscriptionStarted";

case State::AwaitingDestruction:
return "AwaitingDestruction";
}
return "N/A";
}

void DeviceSubscription::OnDeviceConnectionFailure(const ScopedNodeId & peerId, CHIP_ERROR error)
{
ChipLogError(NotSpecified, "Device Sync failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
// TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal.
VerifyOrDie(mState == State::Connecting || mState == State::Stopping);
ChipLogError(NotSpecified, "DeviceSubscription failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId()));
// TODO(#35333) Figure out how we should recover if we fail to connect and mState == State::Connecting.

// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
// DeviceSubscription.
MoveToState(State::AwaitingDestruction);
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}

void DeviceSubscription::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
CHIP_ERROR DeviceSubscription::StartSubscription(OnDoneCallback onDoneCallback, Controller::DeviceController & controller,
NodeId nodeId)
{
VerifyOrDie(!mSubscriptionStarted);
assertChipStackLockedByCurrentThread();
VerifyOrDie(mState == State::Idle);

mCurrentAdministratorCommissioningAttributes = chip_rpc_AdministratorCommissioningChanged_init_default;
mCurrentAdministratorCommissioningAttributes.node_id = nodeId;
mCurrentAdministratorCommissioningAttributes.window_status =
static_cast<uint32_t>(Clusters::AdministratorCommissioning::CommissioningWindowStatusEnum::kWindowNotOpen);
mSubscriptionStarted = true;
mState = State::Connecting;
mOnDoneCallback = onDoneCallback;

return controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
}

void DeviceSubscription::StopSubscription()
{
assertChipStackLockedByCurrentThread();
VerifyOrDie(mState != State::Idle);
// Something is seriously wrong if we die on the line below
VerifyOrDie(mState != State::AwaitingDestruction);

if (mState == State::Stopping)
{
// Stop is called again while we are still waiting on connected callbacks
return;
}

if (mState == State::Connecting)
{
MoveToState(State::Stopping);
return;
}

controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback);
// By calling reset on our ReadClient we terminate the subscription.
VerifyOrDie(mClient);
mClient.reset();
// After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with
// DeviceSubscription.
MoveToState(State::AwaitingDestruction);
mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id);
}
32 changes: 26 additions & 6 deletions examples/fabric-admin/device_manager/DeviceSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "fabric_bridge_service/fabric_bridge_service.pb.h"
#include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"

class DeviceSubscriptionManager;

/// Attribute subscription to attributes that are important to keep track and send to fabric-bridge
/// via RPC when change has been identified.
///
Expand All @@ -35,11 +37,18 @@
class DeviceSubscription : public chip::app::ReadClient::Callback
{
public:
using OnDoneCallback = std::function<void(chip::NodeId)>;

DeviceSubscription();

/// Usually called after we have added a synchronized device to fabric-bridge to monitor
/// for any changes that need to be propgated to fabric-bridge.
void StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);
CHIP_ERROR StartSubscription(OnDoneCallback onDoneCallback, chip::Controller::DeviceController & controller,
chip::NodeId nodeId);

/// This will trigger stopping the subscription. Once subscription is stopped the OnDoneCallback
/// provided in StartSubscription will be called to indicate that subscription have been terminated.
///
/// Must only be called after StartSubscription was successfully called.
void StopSubscription();

///////////////////////////////////////////////////////////////
// ReadClient::Callback implementation
Expand All @@ -57,14 +66,25 @@ class DeviceSubscription : public chip::app::ReadClient::Callback
void OnDeviceConnectionFailure(const chip::ScopedNodeId & peerId, CHIP_ERROR error);

private:
enum class State : uint8_t
{
Idle, ///< Default state that the object starts out in, where no work has commenced
Connecting, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks to be called
Stopping, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks so we can terminate
SubscriptionStarted, ///< We have started a subscription.
AwaitingDestruction, ///< The object has completed its work and is awaiting destruction.
};

void MoveToState(const State aTargetState);
const char * GetStateStr() const;

OnDoneCallback mOnDoneCallback;
std::unique_ptr<chip::app::ReadClient> mClient;

chip::Callback::Callback<chip::OnDeviceConnected> mOnDeviceConnectedCallback;
chip::Callback::Callback<chip::OnDeviceConnectionFailure> mOnDeviceConnectionFailureCallback;

chip_rpc_AdministratorCommissioningChanged mCurrentAdministratorCommissioningAttributes;
bool mChangeDetected = false;
// Ensures that DeviceSubscription starts a subscription only once. If instance of
// DeviceSubscription can be reused, the class documentation should be updated accordingly.
bool mSubscriptionStarted = false;
State mState = State::Idle;
};
77 changes: 77 additions & 0 deletions examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2024 Project CHIP Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#include "DeviceSubscriptionManager.h"
#include "rpc/RpcClient.h"

#include <app/InteractionModelEngine.h>
#include <app/server/Server.h>

#include <app-common/zap-generated/ids/Attributes.h>
#include <app-common/zap-generated/ids/Clusters.h>
#include <device_manager/DeviceManager.h>

using namespace ::chip;
using namespace ::chip::app;

DeviceSubscriptionManager & DeviceSubscriptionManager::Instance()
{
static DeviceSubscriptionManager instance;
return instance;
}

CHIP_ERROR DeviceSubscriptionManager::StartSubscription(Controller::DeviceController & controller, NodeId nodeId)
{
assertChipStackLockedByCurrentThread();
auto it = mDeviceSubscriptionMap.find(nodeId);
VerifyOrReturnError((it == mDeviceSubscriptionMap.end()), CHIP_ERROR_INCORRECT_STATE);

auto deviceSubscription = std::make_unique<DeviceSubscription>();
VerifyOrReturnError(deviceSubscription, CHIP_ERROR_NO_MEMORY);
ReturnErrorOnFailure(deviceSubscription->StartSubscription(
[this](NodeId aNodeId) { this->DeviceSubscriptionTerminated(aNodeId); }, controller, nodeId));

mDeviceSubscriptionMap[nodeId] = std::move(deviceSubscription);
return CHIP_NO_ERROR;
}

CHIP_ERROR DeviceSubscriptionManager::RemoveSubscription(chip::NodeId nodeId)
{
assertChipStackLockedByCurrentThread();
auto it = mDeviceSubscriptionMap.find(nodeId);
VerifyOrReturnError((it != mDeviceSubscriptionMap.end()), CHIP_ERROR_NOT_FOUND);
// We cannot safely erase the DeviceSubscription from mDeviceSubscriptionMap.
// After calling StopSubscription we expect DeviceSubscription to eventually
// call the OnDoneCallback we provided in StartSubscription which will call
// DeviceSubscriptionTerminated where it will be erased from the
// mDeviceSubscriptionMap.
it->second->StopSubscription();
return CHIP_NO_ERROR;
}

void DeviceSubscriptionManager::DeviceSubscriptionTerminated(NodeId nodeId)
{
assertChipStackLockedByCurrentThread();
auto it = mDeviceSubscriptionMap.find(nodeId);
// DeviceSubscriptionTerminated is a private method that is expected to only
// be called by DeviceSubscription when it is terminal and is ready to be
// cleaned up and removed. If it is not mapped that means something has gone
// really wrong and there is likely a memory leak somewhere.
VerifyOrDie(it != mDeviceSubscriptionMap.end());
mDeviceSubscriptionMap.erase(nodeId);
}
43 changes: 43 additions & 0 deletions examples/fabric-admin/device_manager/DeviceSubscriptionManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024 Project CHIP Authors
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#pragma once

#include "DeviceSubscription.h"

#include <app/ReadClient.h>
#include <controller/CHIPDeviceController.h>
#include <lib/core/DataModelTypes.h>

#include <memory>

class DeviceSubscriptionManager
{
public:
static DeviceSubscriptionManager & Instance();

/// Usually called after we have added a synchronized device to fabric-bridge to monitor
/// for any changes that need to be propagated to fabric-bridge.
CHIP_ERROR StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId);

CHIP_ERROR RemoveSubscription(chip::NodeId nodeId);

private:
void DeviceSubscriptionTerminated(chip::NodeId nodeId);

std::unordered_map<chip::NodeId, std::unique_ptr<DeviceSubscription>> mDeviceSubscriptionMap;
};
Loading

0 comments on commit 2708529

Please sign in to comment.