Skip to content

Commit

Permalink
fix: grid not retrieving resources from the new master network in a n…
Browse files Browse the repository at this point in the history
…etwork merge

The grid would only sync resources that were part of the (removed) slave network after a network
merge because the storages from the master network just "remain" and don't get removed and re-added.

In 1d4d3ae an initial fix was made to ensure correct behavior for network splits, by making the
grid highest-priority and relying on the moved storages re-adding themselves afterwards.

However, for network merges, it would
completely ignore storages that were already part of the new network as those remain
and don't get re-added.

If the storages from the new network remain and don't get re-added, we need to notify the grid of
these storages somehow (by re-syncing).

This change adds the re-syncing (replay) necessary for network merges.
For network splits, it will still re-sync (replay) but won't actually do anything as the network is
newly-created and empty at that point.
  • Loading branch information
raoulvdberge committed Jan 2, 2024
1 parent 2020283 commit a2b0865
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 60 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
- The Portable Grid now shows an energy bar in the UI.
- The energy bar on creative items now shows the infinity symbol instead of the whole amount.

### Fixed

- Fixed bug where Grid contents weren't synced properly when a network merge occurs.

## [2.0.0-milestone.3.2] - 2023-11-03

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void splitNetworks(final ConnectionProvider connectionProvider,
throw new IllegalStateException("Network of removed node cannot be empty");
}

connectionProvider.sortDeterministically(removedEntries).forEach(e -> {
connectionProvider.sortDeterministically(removedEntries).stream().sorted(HIGHEST_PRIORITY_FIRST).forEach(e -> {
if (e.getNode().getNetwork() == null) {
throw new IllegalStateException("Network of resulting removed node cannot be empty");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
package com.refinedmods.refinedstorage2.api.network.impl.node.container;

public final class NetworkNodeContainerPriorities {
/**
* It is important that the grid has the highest priority when a network split or merge occurs.
* This priority will affect the grids that are part of the a) newly created network for a split and
* b) removed network in case of a merge.
* For a network split, this will ensure that the grid will be able to invalidate all its watchers first and attach
* to the newly created network.
* After that, all the storages will be re-added into the newly created network.
* For a network merge, this will ensure that the grid will be able to invalidate all its watchers first and attach
* to the other existing network to merge with.
* After that, all the storages that are part of the removed network will be re-added into the other existing
* network to merge with.
* The storages that were already part of the existing network aren't re-added, but those are re-synced in the
* invalidation step.
*/
public static final int GRID = Integer.MAX_VALUE;

private NetworkNodeContainerPriorities() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class GridWatcherRegistration {
this.actorType = actorType;
}

<T> void attach(final StorageChannel<T> storageChannel, final StorageChannelType<T> storageChannelType) {
<T> void attach(final StorageChannel<T> storageChannel,
final StorageChannelType<T> storageChannelType,
final boolean replay) {
final ResourceListListener<T> listener = change -> watcher.onChanged(
storageChannelType,
change.resourceAmount().getResource(),
Expand All @@ -31,6 +33,17 @@ <T> void attach(final StorageChannel<T> storageChannel, final StorageChannelType
);
storageChannel.addListener(listener);
listeners.put(storageChannelType, listener);
if (replay) {
storageChannel.getAll().forEach(resourceAmount -> watcher.onChanged(
storageChannelType,
resourceAmount.getResource(),
resourceAmount.getAmount(),
storageChannel.findTrackedResourceByActorType(
resourceAmount.getResource(),
actorType
).orElse(null)
));
}
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,39 @@ public void addWatcher(
throw new IllegalArgumentException("Watcher is already registered");
}
final GridWatcherRegistration registration = new GridWatcherRegistration(watcher, actorType);
attachAll(registration, network);
attachAll(registration, network, false);
watchers.put(watcher, registration);
LOGGER.info("Added watcher {}, new count is {}", watcher, watchers.size());
}

public void attachAll(final Network network) {
// If we get here we are affected by a network split or network merge.
// At this point, all the storages that are affected by the split or merge have not yet been processed
// as the grid has the highest priority.
watchers.forEach((watcher, registration) -> {
// Invalidate all watcher data, the resources that were synced earlier are no longer valid because we have
// a brand-new network.
watcher.clear();
attachAll(registration, network);
// Re-attach the watcher to the new network, and send all the resources from the new network.
// Resources from the old network are not part of the new network yet, as mentioned above,
// but those will be synced when the storages are re-added.
attachAll(registration, network, true);
});
}

private void attachAll(final GridWatcherRegistration registration, final Network network) {
storageChannelTypes.forEach(storageChannelType -> attach(registration, storageChannelType, network));
private void attachAll(final GridWatcherRegistration registration, final Network network, final boolean replay) {
storageChannelTypes.forEach(storageChannelType -> attach(registration, storageChannelType, network, replay));
}

private <T> void attach(
final GridWatcherRegistration registration,
final StorageChannelType<T> storageChannelType,
final Network network
final Network network,
final boolean replay
) {
LOGGER.info("Attaching {} to {}", registration, storageChannelType);
registration.attach(getStorageChannel(network, storageChannelType), storageChannelType);
final StorageChannel<T> storageChannel = getStorageChannel(network, storageChannelType);
registration.attach(storageChannel, storageChannelType, replay);
}

public void removeWatcher(final GridWatcher watcher, final Network network) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

abstract class AbstractNetworkBuilderImplTest {
protected NetworkBuilder sut;
private ComponentMapFactory<NetworkComponent, Network> componentMapFactory;
protected ComponentMapFactory<NetworkComponent, Network> componentMapFactory;

@BeforeEach
void setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,153 @@

import com.refinedmods.refinedstorage2.api.core.Action;
import com.refinedmods.refinedstorage2.api.grid.GridWatcher;
import com.refinedmods.refinedstorage2.api.network.ConnectionProvider;
import com.refinedmods.refinedstorage2.api.network.Network;
import com.refinedmods.refinedstorage2.api.network.component.StorageNetworkComponent;
import com.refinedmods.refinedstorage2.api.network.impl.node.container.NetworkNodeContainerPriorities;
import com.refinedmods.refinedstorage2.api.network.impl.node.grid.GridNetworkNode;
import com.refinedmods.refinedstorage2.api.network.impl.node.storage.StorageNetworkNode;
import com.refinedmods.refinedstorage2.api.network.node.container.NetworkNodeContainer;
import com.refinedmods.refinedstorage2.api.storage.EmptyActor;
import com.refinedmods.refinedstorage2.api.storage.InMemoryStorageImpl;
import com.refinedmods.refinedstorage2.network.test.NetworkTestFixtures;
import com.refinedmods.refinedstorage2.network.test.util.FakeActor;

import java.util.function.Supplier;

import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class PriorityNetworkBuilderImplTest extends AbstractNetworkBuilderImplTest {
@Test
void shouldRespectPriorityWhenSplitting() {
// Arrange
final FakeConnectionProvider connectionProvider = new FakeConnectionProvider();
final Network originalNetwork = new NetworkImpl(componentMapFactory);
final NetworkSide master = createNetworkSide("master", () -> originalNetwork);
final NetworkNodeContainer connector = createContainerWithNetwork(container -> originalNetwork);
final NetworkSide slave = createNetworkSide("slave", () -> originalNetwork);
clearInvocations(master.watcher);

final NetworkNodeContainer container1 = createContainerWithNetwork();
final Network originalNetwork = container1.getNode().getNetwork();
final NetworkNodeContainer container2 = createContainerWithNetwork(
container -> container1.getNode().getNetwork()
);
final StorageNetworkNode<String> node3 = new StorageNetworkNode<>(0, NetworkTestFixtures.STORAGE_CHANNEL_TYPE);
final InMemoryStorageImpl<String> storage = new InMemoryStorageImpl<>();
storage.insert("N3", 10, Action.EXECUTE, EmptyActor.INSTANCE);
node3.setStorage(storage);
final NetworkNodeContainer container3 = createContainerWithNetwork(
node3,
container -> container1.getNode().getNetwork(),
0
final ConnectionProvider connectionProvider = new FakeConnectionProvider()
.with(master.a, master.b, slave.a, slave.b)
.connect(master.a, master.b)
.connect(slave.a, slave.b);

// Act
sut.remove(connector, connectionProvider);

// Assert
assertThat(master.nodeA.getNetwork()).isSameAs(master.nodeB.getNetwork());
assertThat(slave.nodeA.getNetwork()).isSameAs(slave.nodeB.getNetwork());

assertThat(slave.nodeA.getNetwork()).isNotSameAs(master.nodeA.getNetwork());
assertThat(slave.nodeB.getNetwork()).isNotSameAs(master.nodeA.getNetwork());

final InOrder inOrder = inOrder(slave.watcher);
inOrder.verify(slave.watcher, times(1)).clear();
inOrder.verify(slave.watcher, times(1)).onChanged(
NetworkTestFixtures.STORAGE_CHANNEL_TYPE,
"slave",
10L,
null
);
node3.setActive(true);
final GridNetworkNode node4 = new GridNetworkNode(0, NetworkTestFixtures.STORAGE_CHANNEL_TYPES);
final NetworkNodeContainer container4 = createContainerWithNetwork(
node4,
container -> container1.getNode().getNetwork(),
Integer.MAX_VALUE
verifyNoMoreInteractions(slave.watcher);

verify(master.watcher, times(1)).onChanged(
NetworkTestFixtures.STORAGE_CHANNEL_TYPE,
"slave",
-10L,
null
);
final GridWatcher watcher = mock(GridWatcher.class);
node4.addWatcher(watcher, EmptyActor.class);
verifyNoMoreInteractions(master.watcher);
}

@Test
void shouldRespectPriorityWhenMerging() {
// Arrange
final NetworkSide master = createNetworkSide("master", () -> new NetworkImpl(componentMapFactory));
final NetworkNodeContainer connector = createContainer();
final NetworkSide slave = createNetworkSide("slave", () -> new NetworkImpl(componentMapFactory));

connectionProvider
.with(container1, container2, container3, container4)
.connect(container4, container3);
final ConnectionProvider connectionProvider = new FakeConnectionProvider()
.with(master.a, master.b, connector, slave.a, slave.b)
.connect(master.a, master.b)
.connect(master.b, connector)
// <->
.connect(connector, slave.a)
.connect(slave.a, slave.b);

// Act
sut.remove(container2, connectionProvider);
sut.initialize(connector, connectionProvider);

// Assert
// Container 1 retains its network.
assertThat(container1.getNode().getNetwork())
.isNotNull()
.isSameAs(originalNetwork);
assertThat(container1.getNode().getNetwork().getComponent(StorageNetworkComponent.class)
.getStorageChannel(NetworkTestFixtures.STORAGE_CHANNEL_TYPE).getAll()).isEmpty();

// Container 2 has been removed.
assertThat(container2.getNode().getNetwork()).isNull();

// Container 3 and 4 get a new network.
assertThat(container3.getNode().getNetwork())
.isNotNull()
.isNotSameAs(originalNetwork)
.isSameAs(container4.getNode().getNetwork());
assertThat(container3.getNode().getNetwork().getComponent(StorageNetworkComponent.class)
.getStorageChannel(NetworkTestFixtures.STORAGE_CHANNEL_TYPE).getAll()).hasSize(1);

// Here we ensure that container 4 (the grid) is initialized *after* container 3 (the storage),
// according to the priority declared above.
verify(watcher, times(1)).clear();
verify(watcher).onChanged(
assertThat(slave.nodeA.getNetwork()).isSameAs(master.nodeA.getNetwork());
assertThat(slave.nodeB.getNetwork()).isSameAs(master.nodeA.getNetwork());

final InOrder inOrder = inOrder(slave.watcher);
inOrder.verify(slave.watcher, times(1)).clear();
inOrder.verify(slave.watcher).onChanged(
NetworkTestFixtures.STORAGE_CHANNEL_TYPE,
"slave",
10L,
null
);
inOrder.verify(slave.watcher).onChanged(
NetworkTestFixtures.STORAGE_CHANNEL_TYPE,
"master",
10L,
null
);
inOrder.verifyNoMoreInteractions();

verify(master.watcher, times(1)).onChanged(
NetworkTestFixtures.STORAGE_CHANNEL_TYPE,
"N3",
"slave",
10L,
null
);
verifyNoMoreInteractions(master.watcher);
}

private NetworkSide createNetworkSide(final String name, final Supplier<Network> networkFactory) {
final StorageNetworkNode<String> nodeA = new StorageNetworkNode<>(
0,
NetworkTestFixtures.STORAGE_CHANNEL_TYPE
);
final InMemoryStorageImpl<String> storage = new InMemoryStorageImpl<>();
storage.insert(name, 10, Action.EXECUTE, FakeActor.INSTANCE);
nodeA.setStorage(storage);
final NetworkNodeContainer a = createContainerWithNetwork(
nodeA,
container -> networkFactory.get(),
0
);
nodeA.setActive(true);
final GridNetworkNode nodeB = new GridNetworkNode(0, NetworkTestFixtures.STORAGE_CHANNEL_TYPES);
final NetworkNodeContainer b = createContainerWithNetwork(
nodeB,
container -> a.getNode().getNetwork(),
NetworkNodeContainerPriorities.GRID
);
final GridWatcher watcher = mock(GridWatcher.class, "watcher for " + name);
nodeB.setActive(true);
nodeB.addWatcher(watcher, EmptyActor.class);
return new NetworkSide(a, nodeA, b, nodeB, watcher);
}

private record NetworkSide(
NetworkNodeContainer a,
StorageNetworkNode<String> nodeA,
NetworkNodeContainer b,
GridNetworkNode nodeB,
GridWatcher watcher
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,18 @@ void shouldDetachWatchersFromOldNetworkAndReattachToNewNetwork(
sut.addWatcher(watcher, FakeActor.class);

// Act
// this one shouldn't be ignored!
otherStorageChannel.insert("C", 10, Action.EXECUTE, FakeActor.INSTANCE);

sut.setNetwork(otherNetwork);
network.removeContainer(() -> sut);
otherNetwork.addContainer(() -> sut);

// these one shouldn't be ignored either
otherStorageChannel.insert("A", 10, Action.EXECUTE, FakeActor.INSTANCE);
otherStorageChannel.insert("D", 10, Action.EXECUTE, EmptyActor.INSTANCE);

// these should be ignored
storageChannel.insert("B", 10, Action.EXECUTE, FakeActor.INSTANCE);
storageChannel.insert("D", 10, Action.EXECUTE, EmptyActor.INSTANCE);

Expand All @@ -163,14 +167,25 @@ void shouldDetachWatchersFromOldNetworkAndReattachToNewNetwork(
final ArgumentCaptor<TrackedResource> trackedResources1 = ArgumentCaptor.forClass(TrackedResource.class);
verify(watcher, times(1)).onChanged(
eq(NetworkTestFixtures.STORAGE_CHANNEL_TYPE),
eq("A"),
eq("C"),
eq(10L),
trackedResources1.capture()
);
assertThat(trackedResources1.getAllValues())
.hasSize(1)
.allMatch(t -> FakeActor.INSTANCE.getName().equals(t.getSourceName()));

final ArgumentCaptor<TrackedResource> trackedResources2 = ArgumentCaptor.forClass(TrackedResource.class);
verify(watcher, times(1)).onChanged(
eq(NetworkTestFixtures.STORAGE_CHANNEL_TYPE),
eq("A"),
eq(10L),
trackedResources2.capture()
);
assertThat(trackedResources2.getAllValues())
.hasSize(1)
.allMatch(t -> FakeActor.INSTANCE.getName().equals(t.getSourceName()));

verify(watcher, times(1)).onChanged(
eq(NetworkTestFixtures.STORAGE_CHANNEL_TYPE),
eq("D"),
Expand Down

0 comments on commit a2b0865

Please sign in to comment.