Skip to content

Commit

Permalink
GEODE-9753: Solve PDX serialization coredump
Browse files Browse the repository at this point in the history
 - While serializing a PdxSerializable object, there is a possible race
   condition which might cause the client to crash.
   This race-condition happens whenever the cluster is restarted during
   the serialization process and if
   on-client-disconnect-clear-pdxType-Ids is set to true, meaning the
   PdxTypeRegistry will be cleaned up if the connection towards the
   cluster is lost.
 - This issue has been solved by using the previously fetched local PDX
   type.
 - In order to properly test this solution, PdxRemoteWriterFactory has been added,
   so the race-condition can be force at test-time.
 - A new IT has been added to test that the solution is working fine.
 - make_unique was needed inside cppcache/src, so it was moved to
   utils/cxx_extensions.hpp
  • Loading branch information
gaussianrecurrence committed Nov 8, 2021
1 parent 5a1ba8d commit b010059
Show file tree
Hide file tree
Showing 19 changed files with 407 additions and 66 deletions.
1 change: 1 addition & 0 deletions cppcache/integration/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_executable(cpp-integration-test
PdxInstanceFactoryTest.cpp
PdxInstanceTest.cpp
PdxJsonTypeTest.cpp
PdxSerializableTest.cpp
PdxSerializerTest.cpp
PdxTypeRegistryTest.cpp
Position.cpp
Expand Down
9 changes: 5 additions & 4 deletions cppcache/integration/test/HARegionCacheListenerARLEPDTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <geode/RegionShortcut.hpp>

#include "mock/CacheListenerMock.hpp"
#include "utility/make_unique.hpp"
#include "util/cxx_extensions.hpp"

namespace {
class HARegionCacheListenerARLEPDTest : public ::testing::Test {
Expand Down Expand Up @@ -162,12 +162,13 @@ class HARegionCacheListenerARLEPDTest : public ::testing::Test {
listener_ =
::std::make_shared<::apache::geode::client::Nice_MockListener>();

cluster_ = ::make_unique<::Cluster>(Name{"HARegionCacheListenerARLEPDTest"},
LocatorCount{1}, ServerCount{1});
cluster_ =
cxx::make_unique<::Cluster>(Name{"HARegionCacheListenerARLEPDTest"},
LocatorCount{1}, ServerCount{1});

cluster_->start();

cache_ = ::make_unique<::apache::geode::client::Cache>(
cache_ = cxx::make_unique<::apache::geode::client::Cache>(
cluster_->createCache({}, ::Cluster::SubscriptionState::Enabled));

region_ = cache_
Expand Down
8 changes: 4 additions & 4 deletions cppcache/integration/test/HARegionCacheListenerARLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <geode/RegionShortcut.hpp>

#include "mock/CacheListenerMock.hpp"
#include "utility/make_unique.hpp"
#include "util/cxx_extensions.hpp"

namespace {
class HARegionCacheListenerARLTest : public ::testing::Test {
Expand Down Expand Up @@ -154,12 +154,12 @@ class HARegionCacheListenerARLTest : public ::testing::Test {
listener_ =
::std::make_shared<::apache::geode::client::Nice_MockListener>();

cluster_ = ::make_unique<::Cluster>(Name{"HARegionCacheListenerARLTest"},
LocatorCount{1}, ServerCount{1});
cluster_ = cxx::make_unique<::Cluster>(Name{"HARegionCacheListenerARLTest"},
LocatorCount{1}, ServerCount{1});

cluster_->start();

cache_ = make_unique<::apache::geode::client::Cache>(
cache_ = cxx::make_unique<::apache::geode::client::Cache>(
cluster_->createCache({}, ::Cluster::SubscriptionState::Enabled));

region_ = cache_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <geode/RegionShortcut.hpp>

#include "mock/CacheListenerMock.hpp"
#include "utility/make_unique.hpp"
#include "util/cxx_extensions.hpp"

namespace {
// A simple comparator for cachables wrapped in shared pointers.
Expand Down Expand Up @@ -166,12 +166,12 @@ class HARegionCacheListenerKeyValueTest : public ::testing::Test {
::std::make_shared<::apache::geode::client::Nice_MockListener>();

cluster_ =
::make_unique<::Cluster>(Name{"HARegionCacheListenerKeyValueTest"},
LocatorCount{1}, ServerCount{1});
cxx::make_unique<::Cluster>(Name{"HARegionCacheListenerKeyValueTest"},
LocatorCount{1}, ServerCount{1});

cluster_->start();

cache_ = make_unique<::apache::geode::client::Cache>(
cache_ = cxx::make_unique<::apache::geode::client::Cache>(
cluster_->createCache({}, ::Cluster::SubscriptionState::Enabled));

region_ = cache_
Expand Down
8 changes: 4 additions & 4 deletions cppcache/integration/test/HARegionCacheListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <geode/RegionShortcut.hpp>

#include "mock/CacheListenerMock.hpp"
#include "utility/make_unique.hpp"
#include "util/cxx_extensions.hpp"

namespace {
class HARegionCacheListenerTest : public ::testing::Test {
Expand Down Expand Up @@ -198,12 +198,12 @@ class HARegionCacheListenerTest : public ::testing::Test {
std::cout << "afterRegionDisconnected callback.\n";
});
*/
cluster_ = ::make_unique<::Cluster>(Name{"HARegionCacheListenerTest"},
LocatorCount{1}, ServerCount{1});
cluster_ = cxx::make_unique<::Cluster>(Name{"HARegionCacheListenerTest"},
LocatorCount{1}, ServerCount{1});

cluster_->start();

cache_ = make_unique<::apache::geode::client::Cache>(
cache_ = cxx::make_unique<::apache::geode::client::Cache>(
cluster_->createCache({}, ::Cluster::SubscriptionState::Enabled));

region_ = cache_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include <geode/RegionShortcut.hpp>

#include "mock/CacheListenerMock.hpp"
#include "utility/make_unique.hpp"
#include "util/cxx_extensions.hpp"

namespace {
// A simple comparator for cachables wrapped in shared pointers.
Expand Down Expand Up @@ -163,13 +163,13 @@ class HARegionCacheListenerWithClusterRegionTest : public ::testing::Test {
listener_ =
::std::make_shared<::apache::geode::client::Nice_MockListener>();

cluster_ = ::make_unique<::Cluster>(
cluster_ = cxx::make_unique<::Cluster>(
Name{"HARegionCacheListenerWithClusterRegionTest"}, LocatorCount{1},
ServerCount{1});

cluster_->start();

cache_ = ::make_unique<::apache::geode::client::Cache>(
cache_ = cxx::make_unique<::apache::geode::client::Cache>(
cluster_->createCache({}, ::Cluster::SubscriptionState::Enabled));

region_ = cache_
Expand Down
4 changes: 2 additions & 2 deletions cppcache/integration/test/Order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <geode/PdxReader.hpp>
#include <geode/PdxWriter.hpp>

namespace WanDeserialization {
namespace PdxTests {

void Order::fromData(PdxReader& pdxReader) {
order_id_ = pdxReader.readInt(ORDER_ID_KEY_);
Expand Down Expand Up @@ -64,4 +64,4 @@ const std::string Order::ORDER_ID_KEY_ = "order_id";
const std::string Order::NAME_KEY_ = "name";
const std::string Order::QUANTITY_KEY_ = "quantity";

} // namespace WanDeserialization
} // namespace PdxTests
4 changes: 2 additions & 2 deletions cppcache/integration/test/Order.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include <geode/PdxSerializable.hpp>

namespace WanDeserialization {
namespace PdxTests {

using apache::geode::client::PdxReader;
using apache::geode::client::PdxSerializable;
Expand Down Expand Up @@ -69,6 +69,6 @@ class Order : public PdxSerializable {
int16_t quantity_;
};

} // namespace WanDeserialization
} // namespace PdxTests

#endif // ORDER_H
183 changes: 183 additions & 0 deletions cppcache/integration/test/PdxSerializableTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <framework/Cluster.h>
#include <gmock/gmock.h>

#include <memory>

#include <gtest/gtest.h>

#include <geode/Cache.hpp>
#include <geode/EntryEvent.hpp>
#include <geode/PdxWrapper.hpp>
#include <geode/PoolManager.hpp>
#include <geode/RegionEvent.hpp>
#include <geode/RegionFactory.hpp>
#include <geode/RegionShortcut.hpp>
#include <geode/TypeRegistry.hpp>

#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "Order.hpp"
#include "PdxRemoteWriter.hpp"
#include "PdxRemoteWriterFactoryImpl.hpp"
#include "gmock_actions.hpp"
#include "mock/CacheListenerMock.hpp"
#include "util/concurrent/binary_semaphore.hpp"
#include "util/cxx_extensions.hpp"

namespace {

using apache::geode::client::binary_semaphore;

using apache::geode::client::Cache;
using apache::geode::client::Cacheable;
using apache::geode::client::CacheFactory;
using apache::geode::client::CacheListenerMock;
using apache::geode::client::CacheRegionHelper;
using apache::geode::client::DataOutput;
using apache::geode::client::EntryEvent;
using apache::geode::client::PdxReader;
using apache::geode::client::PdxRemoteWriter;
using apache::geode::client::PdxRemoteWriterFactoryImpl;
using apache::geode::client::PdxSerializable;
using apache::geode::client::PdxSerializer;
using apache::geode::client::PdxType;
using apache::geode::client::PdxTypeRegistry;
using apache::geode::client::PdxWrapper;
using apache::geode::client::PdxWriter;
using apache::geode::client::Region;
using apache::geode::client::RegionEvent;
using apache::geode::client::RegionShortcut;
using apache::geode::client::UserObjectSizer;

using ::testing::_;
using ::testing::DoAll;
using ::testing::Return;

using PdxTests::Order;

namespace {
class PdxRemoteWriterConcurrentFactory
: public virtual PdxRemoteWriterFactoryImpl {
public:
~PdxRemoteWriterConcurrentFactory() override = default;

std::unique_ptr<PdxRemoteWriter> create(
DataOutput& output, const std::shared_ptr<PdxSerializable>& object,
const std::shared_ptr<PdxTypeRegistry>& pdxTypeRegistry,
const std::shared_ptr<PdxType>& localType) override {
std::lock_guard<decltype(m_Mutex)> guard_{m_Mutex};
return PdxRemoteWriterFactoryImpl::create(output, object, pdxTypeRegistry,
localType);
}

std::mutex& getMutex() { return m_Mutex; }

protected:
std::mutex m_Mutex;
};

} // namespace

Cache createTestCache() {
CacheFactory cacheFactory;
return cacheFactory.set("log-level", "none")
.set("on-client-disconnect-clear-pdxType-Ids", "true")
.set("statistic-sampling-enabled", "false")
.create();
}

TEST(PdxSerializableTest, serializeWhileClusterRestarting) {
binary_semaphore live_sem{0};
binary_semaphore shut_sem{1};

Cluster cluster{LocatorCount{1}, ServerCount{1}};
cluster.start();

auto& gfsh = cluster.getGfsh();
gfsh.create().region().withName("region").withType("REPLICATE").execute();

auto cache = createTestCache();
auto cacheImpl = CacheRegionHelper::getCacheImpl(&cache);

auto factory = cxx::make_unique<PdxRemoteWriterConcurrentFactory>();
auto& mutex = factory->getMutex();

cacheImpl->setPdxRemoteWriterFactory(std::move(factory));

auto listener = std::make_shared<CacheListenerMock>();
EXPECT_CALL(*listener, afterCreate(_)).WillRepeatedly(Return());
EXPECT_CALL(*listener, afterRegionDestroy(_)).WillRepeatedly(Return());
EXPECT_CALL(*listener, close(_)).WillRepeatedly(Return());
EXPECT_CALL(*listener, afterRegionLive(_))
.WillRepeatedly(DoAll(ReleaseSem(&live_sem), AcquireSem(&shut_sem)));
EXPECT_CALL(*listener, afterRegionDisconnected(_))
.WillRepeatedly(DoAll(ReleaseSem(&shut_sem), AcquireSem(&live_sem)));

{
auto poolFactory = cache.getPoolManager()
.createFactory()
.setReadTimeout(std::chrono::seconds{1})
.setPingInterval(std::chrono::seconds{1})
.setSubscriptionEnabled(true);
cluster.applyLocators(poolFactory);
poolFactory.create("default");
}

auto region = cache.createRegionFactory(RegionShortcut::CACHING_PROXY)
.setPoolName("default")
.setCacheListener(listener)
.create("region");

cache.getTypeRegistry().registerPdxType(Order::createDeserializable);

// Create local PDX type in the registry
region->put("2", std::make_shared<Order>(2, "product x", 37));

std::thread thread;

{
// Lock the mutex so we ensure that cluster is restarted just before
// PdxRemoteWriter is created. This way localPdxType != nullptr, but
// the pdxTypeRegistry is cleaned up. If PdxType were instead fetched
// from PdxTypeRegistry, this would result in a coredump.

std::lock_guard<std::mutex> guard_{mutex};
thread = std::thread([&region] {
region->put("3", std::make_shared<Order>(3, "product y", 37));
});

std::this_thread::sleep_for(std::chrono::seconds{5});
gfsh.shutdown().execute();

shut_sem.acquire();
shut_sem.release();

std::this_thread::sleep_for(std::chrono::seconds{5});
for (auto& server : cluster.getServers()) {
server.start();
}

live_sem.acquire();
live_sem.release();
}
thread.join();
}

} // namespace
4 changes: 3 additions & 1 deletion cppcache/integration/test/WanDeserializationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ using apache::geode::client::Pool;
using apache::geode::client::Region;
using apache::geode::client::RegionShortcut;
using apache::geode::client::Serializable;

using PdxTests::Order;

using std::chrono::minutes;
using std::chrono::seconds;
using WanDeserialization::Order;

class GeodeCacheListener : public CacheListener {
private:
Expand Down
Loading

0 comments on commit b010059

Please sign in to comment.