diff --git a/examples/fabric-admin/BUILD.gn b/examples/fabric-admin/BUILD.gn index e408cdd24f264b..805fda1cb500c7 100644 --- a/examples/fabric-admin/BUILD.gn +++ b/examples/fabric-admin/BUILD.gn @@ -84,6 +84,8 @@ static_library("fabric-admin-utils") { "device_manager/DeviceManager.h", "device_manager/DeviceSubscription.cpp", "device_manager/DeviceSubscription.h", + "device_manager/DeviceSubscriptionManager.cpp", + "device_manager/DeviceSubscriptionManager.h", "device_manager/DeviceSynchronization.cpp", "device_manager/DeviceSynchronization.h", ] diff --git a/examples/fabric-admin/commands/pairing/PairingCommand.cpp b/examples/fabric-admin/commands/pairing/PairingCommand.cpp index 89887ec700c69f..c9b58cd92f83ee 100644 --- a/examples/fabric-admin/commands/pairing/PairingCommand.cpp +++ b/examples/fabric-admin/commands/pairing/PairingCommand.cpp @@ -423,7 +423,9 @@ void PairingCommand::OnCommissioningComplete(NodeId nodeId, CHIP_ERROR err) { // print to console fprintf(stderr, "New device with Node ID: 0x%lx has been successfully added.\n", nodeId); - DeviceSynchronizer::Instance().StartDeviceSynchronization(CurrentCommissioner(), mNodeId, mDeviceIsICD); + // CurrentCommissioner() has a lifetime that is the entire life of the application itself + // so it is safe to provide to StartDeviceSynchronization. + DeviceSynchronizer::Instance().StartDeviceSynchronization(&CurrentCommissioner(), mNodeId, mDeviceIsICD); } else { @@ -564,6 +566,8 @@ void PairingCommand::OnCurrentFabricRemove(void * context, NodeId nodeId, CHIP_E fprintf(stderr, "Device with Node ID: 0x%lx has been successfully removed.\n", nodeId); #if defined(PW_RPC_ENABLED) + chip::app::InteractionModelEngine::GetInstance()->ShutdownSubscriptions(command->CurrentCommissioner().GetFabricIndex(), + nodeId); RemoveSynchronizedDevice(nodeId); #endif } diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.cpp b/examples/fabric-admin/device_manager/DeviceSubscription.cpp index 73f90982b9c4e6..eabbdda15dea15 100644 --- a/examples/fabric-admin/device_manager/DeviceSubscription.cpp +++ b/examples/fabric-admin/device_manager/DeviceSubscription.cpp @@ -100,7 +100,7 @@ void DeviceSubscription::OnReportEnd() #if defined(PW_RPC_ENABLED) AdminCommissioningAttributeChanged(mCurrentAdministratorCommissioningAttributes); #else - ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled"); + ChipLogError(NotSpecified, "Cannot forward Administrator Commissioning Attribute to fabric bridge: RPC not enabled"); #endif mChangeDetected = false; } @@ -108,7 +108,10 @@ void DeviceSubscription::OnReportEnd() void DeviceSubscription::OnDone(ReadClient * apReadClient) { - // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal. + // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with + // DeviceSubscription. + MoveToState(State::AwaitingDestruction); + mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id); } void DeviceSubscription::OnError(CHIP_ERROR error) @@ -118,6 +121,15 @@ void DeviceSubscription::OnError(CHIP_ERROR error) void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchangeMgr, const SessionHandle & sessionHandle) { + if (mState == State::Stopping) + { + // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with + // DeviceSubscription. + MoveToState(State::AwaitingDestruction); + mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id); + return; + } + VerifyOrDie(mState == State::Connecting); mClient = std::make_unique(app::InteractionModelEngine::GetInstance(), &exchangeMgr /* echangeMgr */, *this /* callback */, ReadClient::InteractionType::Subscribe); VerifyOrDie(mClient); @@ -136,25 +148,95 @@ void DeviceSubscription::OnDeviceConnected(Messaging::ExchangeManager & exchange if (err != CHIP_NO_ERROR) { ChipLogError(NotSpecified, "Failed to issue subscription to AdministratorCommissioning data"); - // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal. + // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with + // DeviceSubscription. + MoveToState(State::AwaitingDestruction); + mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id); + return; } + MoveToState(State::SubscriptionStarted); +} + +void DeviceSubscription::MoveToState(const State aTargetState) +{ + mState = aTargetState; + ChipLogDetail(NotSpecified, "DeviceSubscription moving to [%10.10s]", GetStateStr()); +} + +const char * DeviceSubscription::GetStateStr() const +{ + switch (mState) + { + case State::Idle: + return "Idle"; + + case State::Connecting: + return "Connecting"; + + case State::Stopping: + return "Stopping"; + + case State::SubscriptionStarted: + return "SubscriptionStarted"; + + case State::AwaitingDestruction: + return "AwaitingDestruction"; + } + return "N/A"; } void DeviceSubscription::OnDeviceConnectionFailure(const ScopedNodeId & peerId, CHIP_ERROR error) { - ChipLogError(NotSpecified, "Device Sync failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId())); - // TODO(#35077) In follow up PR we will indicate to a manager DeviceSubscription is terminal. + VerifyOrDie(mState == State::Connecting || mState == State::Stopping); + ChipLogError(NotSpecified, "DeviceSubscription failed to connect to " ChipLogFormatX64, ChipLogValueX64(peerId.GetNodeId())); + // TODO(#35333) Figure out how we should recover if we fail to connect and mState == State::Connecting. + + // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with + // DeviceSubscription. + MoveToState(State::AwaitingDestruction); + mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id); } -void DeviceSubscription::StartSubscription(Controller::DeviceController & controller, NodeId nodeId) +CHIP_ERROR DeviceSubscription::StartSubscription(OnDoneCallback onDoneCallback, Controller::DeviceController & controller, + NodeId nodeId) { - VerifyOrDie(!mSubscriptionStarted); + assertChipStackLockedByCurrentThread(); + VerifyOrDie(mState == State::Idle); mCurrentAdministratorCommissioningAttributes = chip_rpc_AdministratorCommissioningChanged_init_default; mCurrentAdministratorCommissioningAttributes.node_id = nodeId; mCurrentAdministratorCommissioningAttributes.window_status = static_cast(Clusters::AdministratorCommissioning::CommissioningWindowStatusEnum::kWindowNotOpen); - mSubscriptionStarted = true; + mState = State::Connecting; + mOnDoneCallback = onDoneCallback; + + return controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback); +} + +void DeviceSubscription::StopSubscription() +{ + assertChipStackLockedByCurrentThread(); + VerifyOrDie(mState != State::Idle); + // Something is seriously wrong if we die on the line below + VerifyOrDie(mState != State::AwaitingDestruction); + + if (mState == State::Stopping) + { + // Stop is called again while we are still waiting on connected callbacks + return; + } + + if (mState == State::Connecting) + { + MoveToState(State::Stopping); + return; + } - controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback); + // By calling reset on our ReadClient we terminate the subscription. + VerifyOrDie(mClient); + mClient.reset(); + // After calling mOnDoneCallback we are indicating that `this` is deleted and we shouldn't do anything else with + // DeviceSubscription. + MoveToState(State::AwaitingDestruction); + mOnDoneCallback(mCurrentAdministratorCommissioningAttributes.node_id); } diff --git a/examples/fabric-admin/device_manager/DeviceSubscription.h b/examples/fabric-admin/device_manager/DeviceSubscription.h index 7a9e5041b28d8b..353eb30c1c7158 100644 --- a/examples/fabric-admin/device_manager/DeviceSubscription.h +++ b/examples/fabric-admin/device_manager/DeviceSubscription.h @@ -26,6 +26,8 @@ #include "fabric_bridge_service/fabric_bridge_service.pb.h" #include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h" +class DeviceSubscriptionManager; + /// Attribute subscription to attributes that are important to keep track and send to fabric-bridge /// via RPC when change has been identified. /// @@ -35,11 +37,18 @@ class DeviceSubscription : public chip::app::ReadClient::Callback { public: + using OnDoneCallback = std::function; + DeviceSubscription(); - /// Usually called after we have added a synchronized device to fabric-bridge to monitor - /// for any changes that need to be propgated to fabric-bridge. - void StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId); + CHIP_ERROR StartSubscription(OnDoneCallback onDoneCallback, chip::Controller::DeviceController & controller, + chip::NodeId nodeId); + + /// This will trigger stopping the subscription. Once subscription is stopped the OnDoneCallback + /// provided in StartSubscription will be called to indicate that subscription have been terminated. + /// + /// Must only be called after StartSubscription was successfully called. + void StopSubscription(); /////////////////////////////////////////////////////////////// // ReadClient::Callback implementation @@ -57,6 +66,19 @@ class DeviceSubscription : public chip::app::ReadClient::Callback void OnDeviceConnectionFailure(const chip::ScopedNodeId & peerId, CHIP_ERROR error); private: + enum class State : uint8_t + { + Idle, ///< Default state that the object starts out in, where no work has commenced + Connecting, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks to be called + Stopping, ///< We are waiting for OnDeviceConnected or OnDeviceConnectionFailure callbacks so we can terminate + SubscriptionStarted, ///< We have started a subscription. + AwaitingDestruction, ///< The object has completed its work and is awaiting destruction. + }; + + void MoveToState(const State aTargetState); + const char * GetStateStr() const; + + OnDoneCallback mOnDoneCallback; std::unique_ptr mClient; chip::Callback::Callback mOnDeviceConnectedCallback; @@ -64,7 +86,5 @@ class DeviceSubscription : public chip::app::ReadClient::Callback chip_rpc_AdministratorCommissioningChanged mCurrentAdministratorCommissioningAttributes; bool mChangeDetected = false; - // Ensures that DeviceSubscription starts a subscription only once. If instance of - // DeviceSubscription can be reused, the class documentation should be updated accordingly. - bool mSubscriptionStarted = false; + State mState = State::Idle; }; diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp new file mode 100644 index 00000000000000..c59fd98192cd47 --- /dev/null +++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.cpp @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "DeviceSubscriptionManager.h" +#include "rpc/RpcClient.h" + +#include +#include + +#include +#include +#include + +using namespace ::chip; +using namespace ::chip::app; + +DeviceSubscriptionManager & DeviceSubscriptionManager::Instance() +{ + static DeviceSubscriptionManager instance; + return instance; +} + +CHIP_ERROR DeviceSubscriptionManager::StartSubscription(Controller::DeviceController & controller, NodeId nodeId) +{ + assertChipStackLockedByCurrentThread(); + auto it = mDeviceSubscriptionMap.find(nodeId); + VerifyOrReturnError((it == mDeviceSubscriptionMap.end()), CHIP_ERROR_INCORRECT_STATE); + + auto deviceSubscription = std::make_unique(); + VerifyOrReturnError(deviceSubscription, CHIP_ERROR_NO_MEMORY); + ReturnErrorOnFailure(deviceSubscription->StartSubscription( + [this](NodeId aNodeId) { this->DeviceSubscriptionTerminated(aNodeId); }, controller, nodeId)); + + mDeviceSubscriptionMap[nodeId] = std::move(deviceSubscription); + return CHIP_NO_ERROR; +} + +CHIP_ERROR DeviceSubscriptionManager::RemoveSubscription(chip::NodeId nodeId) +{ + assertChipStackLockedByCurrentThread(); + auto it = mDeviceSubscriptionMap.find(nodeId); + VerifyOrReturnError((it != mDeviceSubscriptionMap.end()), CHIP_ERROR_NOT_FOUND); + // We cannot safely erase the DeviceSubscription from mDeviceSubscriptionMap. + // After calling StopSubscription we expect DeviceSubscription to eventually + // call the OnDoneCallback we provided in StartSubscription which will call + // DeviceSubscriptionTerminated where it will be erased from the + // mDeviceSubscriptionMap. + it->second->StopSubscription(); + return CHIP_NO_ERROR; +} + +void DeviceSubscriptionManager::DeviceSubscriptionTerminated(NodeId nodeId) +{ + assertChipStackLockedByCurrentThread(); + auto it = mDeviceSubscriptionMap.find(nodeId); + // DeviceSubscriptionTerminated is a private method that is expected to only + // be called by DeviceSubscription when it is terminal and is ready to be + // cleaned up and removed. If it is not mapped that means something has gone + // really wrong and there is likely a memory leak somewhere. + VerifyOrDie(it != mDeviceSubscriptionMap.end()); + mDeviceSubscriptionMap.erase(nodeId); +} diff --git a/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h new file mode 100644 index 00000000000000..5f4e1158634a29 --- /dev/null +++ b/examples/fabric-admin/device_manager/DeviceSubscriptionManager.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "DeviceSubscription.h" + +#include +#include +#include + +#include + +class DeviceSubscriptionManager +{ +public: + static DeviceSubscriptionManager & Instance(); + + /// Usually called after we have added a synchronized device to fabric-bridge to monitor + /// for any changes that need to be propagated to fabric-bridge. + CHIP_ERROR StartSubscription(chip::Controller::DeviceController & controller, chip::NodeId nodeId); + + CHIP_ERROR RemoveSubscription(chip::NodeId nodeId); + +private: + void DeviceSubscriptionTerminated(chip::NodeId nodeId); + + std::unordered_map> mDeviceSubscriptionMap; +}; diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp index 3239f89046fb6e..adbe0c0213448d 100644 --- a/examples/fabric-admin/device_manager/DeviceSynchronization.cpp +++ b/examples/fabric-admin/device_manager/DeviceSynchronization.cpp @@ -17,6 +17,8 @@ */ #include "DeviceSynchronization.h" + +#include "DeviceSubscriptionManager.h" #include "rpc/RpcClient.h" #include @@ -72,13 +74,6 @@ void DeviceSynchronizer::OnAttributeData(const ConcreteDataAttributePath & path, VerifyOrDie(path.mEndpointId == kRootEndpointId); VerifyOrDie(path.mClusterId == Clusters::BasicInformation::Id); - CHIP_ERROR error = status.ToChipError(); - if (CHIP_NO_ERROR != error) - { - ChipLogError(NotSpecified, "Response Failure: %" CHIP_ERROR_FORMAT, error.Format()); - return; - } - switch (path.mAttributeId) { case Clusters::BasicInformation::Attributes::UniqueID::Id: @@ -131,6 +126,17 @@ void DeviceSynchronizer::OnReportEnd() if (!DeviceMgr().IsCurrentBridgeDevice(mCurrentDeviceData.node_id)) { AddSynchronizedDevice(mCurrentDeviceData); + // TODO(#35077) Figure out how we should reflect CADMIN values of ICD. + if (!mCurrentDeviceData.is_icd) + { + VerifyOrDie(mController); + // TODO(#35333) Figure out how we should recover in this circumstance. + CHIP_ERROR err = DeviceSubscriptionManager::Instance().StartSubscription(*mController, mCurrentDeviceData.node_id); + if (err != CHIP_NO_ERROR) + { + ChipLogError(NotSpecified, "Failed start subscription to "); + } + } } #else ChipLogError(NotSpecified, "Cannot synchronize device with fabric bridge: RPC not enabled"); @@ -178,9 +184,10 @@ void DeviceSynchronizer::OnDeviceConnectionFailure(const chip::ScopedNodeId & pe mDeviceSyncInProcess = false; } -void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId, +void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId, bool deviceIsIcd) { + VerifyOrDie(controller); if (mDeviceSyncInProcess) { ChipLogError(NotSpecified, "Device Sync NOT POSSIBLE: another sync is in progress"); @@ -194,5 +201,6 @@ void DeviceSynchronizer::StartDeviceSynchronization(chip::Controller::DeviceCont mDeviceSyncInProcess = true; - controller.GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback); + mController = controller; + controller->GetConnectedDevice(nodeId, &mOnDeviceConnectedCallback, &mOnDeviceConnectionFailureCallback); } diff --git a/examples/fabric-admin/device_manager/DeviceSynchronization.h b/examples/fabric-admin/device_manager/DeviceSynchronization.h index 6aca23f31cbeaa..c5a42378b8be9d 100644 --- a/examples/fabric-admin/device_manager/DeviceSynchronization.h +++ b/examples/fabric-admin/device_manager/DeviceSynchronization.h @@ -40,7 +40,12 @@ class DeviceSynchronizer : public chip::app::ReadClient::Callback /// Usually called after commissioning is complete, initiates a /// read of required data from the remote node ID and then will synchronize /// the device towards the fabric bridge - void StartDeviceSynchronization(chip::Controller::DeviceController & controller, chip::NodeId nodeId, bool deviceIsIcd); + /// + /// @param controller Must be a non-null pointer. The DeviceController instance + /// pointed to must out live the entire device synchronization process. + /// @param nodeId Node ID of the device we need to syncronize data from. + /// @param deviceIsIcd If the device is an ICD device. + void StartDeviceSynchronization(chip::Controller::DeviceController * controller, chip::NodeId nodeId, bool deviceIsIcd); /////////////////////////////////////////////////////////////// // ReadClient::Callback implementation @@ -65,6 +70,9 @@ class DeviceSynchronizer : public chip::app::ReadClient::Callback chip::Callback::Callback mOnDeviceConnectedCallback; chip::Callback::Callback mOnDeviceConnectionFailureCallback; - bool mDeviceSyncInProcess = false; - chip_rpc_SynchronizedDevice mCurrentDeviceData = chip_rpc_SynchronizedDevice_init_default; + // mController is expected to remain valid throughout the entire device synchronization process (i.e. when + // mDeviceSyncInProcess is true). + chip::Controller::DeviceController * mController = nullptr; + bool mDeviceSyncInProcess = false; + chip_rpc_SynchronizedDevice mCurrentDeviceData = chip_rpc_SynchronizedDevice_init_default; };