Skip to content

Commit

Permalink
Limit concurrent large modifications applications (#501)
Browse files Browse the repository at this point in the history
Signed-off-by: Ayoub LABIDI <[email protected]>
  • Loading branch information
ayolab authored Jul 26, 2024
1 parent 0b2ee7c commit 1df2e93
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import com.powsybl.commons.report.TypedValue;
import com.powsybl.iidm.network.Network;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;

import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.Pair;
import org.gridsuite.modification.server.ModificationType;
import org.gridsuite.modification.server.NetworkModificationException;
import org.gridsuite.modification.server.dto.ModificationInfos;
import org.gridsuite.modification.server.dto.NetworkInfos;
Expand All @@ -33,6 +37,9 @@

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Slimane Amar <slimane.amar at rte-france.com>
Expand All @@ -51,31 +58,86 @@ public class NetworkModificationApplicator {

@Getter private final FilterService filterService;

private final ExecutorService applicationExecutor;

@Value("${impacts.collection-threshold:50}")
@Setter // TODO REMOVE when VoltageInitReportTest will no longer use NetworkModificationApplicator
private Integer collectionThreshold;

public NetworkModificationApplicator(NetworkStoreService networkStoreService, EquipmentInfosService equipmentInfosService,
ReportService reportService, FilterService filterService) {
ReportService reportService, FilterService filterService,
@Value("${max-large-concurrent-applications}") int maxConcurrentApplications) {
this.networkStoreService = networkStoreService;
this.equipmentInfosService = equipmentInfosService;
this.reportService = reportService;
this.filterService = filterService;
this.applicationExecutor = Executors.newFixedThreadPool(maxConcurrentApplications);
}

/* This method is used when creating, inserting, moving or duplicating modifications
* Since there is no queue for these operations and they can be memory consuming when the preloading strategy is large
* (for example for VOLTAGE_INIT_MODIFICATION),
* we limit the number of concurrent applications of these modifications to avoid out of memory issues.
* We keep the possibility to apply small or medium modifications immediately in parallel without limits.
* And if in the future we also need to limit the memory consumption of medium modifications we can add more code here.
* Note : we currently have 3 sizes of modifications :
* small : preloadingStrategy = NONE
* medium : preloadingStrategy = COLLECTION
* large : preloadingStrategy = ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW
*/
public NetworkModificationResult applyModifications(List<ModificationInfos> modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) {
PreloadingStrategy preloadingStrategy = modificationInfosList.stream()
.map(ModificationInfos::getType)
.reduce(ModificationType::maxStrategy)
.map(ModificationType::getStrategy)
.orElse(PreloadingStrategy.NONE);
if (preloadingStrategy == PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW) {
CompletableFuture<NetworkModificationResult> future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosList, networkInfos, reportInfos), applicationExecutor);
return future.join();
} else {
return processApplication(modificationInfosList, networkInfos, reportInfos);
}
}

// used for creating, inserting, moving or duplicating modifications
private NetworkModificationResult processApplication(List<ModificationInfos> modificationInfosList, NetworkInfos networkInfos, ReportInfos reportInfos) {
NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold);
ApplicationStatus groupApplicationStatus = apply(modificationInfosList, listener.getNetwork(), reportInfos);
List<AbstractBaseImpact> networkImpacts = listener.flushNetworkModifications();
return
NetworkModificationResult.builder()
return NetworkModificationResult.builder()
.applicationStatus(groupApplicationStatus)
.lastGroupApplicationStatus(groupApplicationStatus)
.networkImpacts(networkImpacts)
.build();
}

/* This method is used when building a variant
* building a variant is limited to ${consumer.concurrency} (typically 2) concurrent builds thanks to rabbitmq queue
* but since the other operations (create, insert, move, duplicate) are not inserted in the same rabbitmq queue
* we use the same ExecutorService to globally limit the number of concurrent large modifications in order to avoid out of memory issues
* We keep the possibility to apply small or medium modifications immediately.
* And if in the future we also need to limit the memory consumption of medium modifications we can add more code here.
* Note : it is possible that the rabbitmq consumer threads here will be blocked by modifications applied directly in the other applyModifications method
* and no more builds can go through. If this causes problems we should put them in separate rabbitmq queues.
*/
public NetworkModificationResult applyModifications(List<Pair<String, List<ModificationInfos>>> modificationInfosGroups, NetworkInfos networkInfos, UUID reportUuid) {
PreloadingStrategy preloadingStrategy = modificationInfosGroups.stream()
.map(Pair::getRight)
.flatMap(List::stream)
.map(ModificationInfos::getType)
.reduce(ModificationType::maxStrategy)
.map(ModificationType::getStrategy)
.orElse(PreloadingStrategy.NONE);
if (preloadingStrategy == PreloadingStrategy.ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW) {
CompletableFuture<NetworkModificationResult> future = CompletableFuture.supplyAsync(() -> processApplication(modificationInfosGroups, networkInfos, reportUuid), applicationExecutor);
return future.join();
} else {
return processApplication(modificationInfosGroups, networkInfos, reportUuid);
}
}

// used for building a variant
private NetworkModificationResult processApplication(List<Pair<String, List<ModificationInfos>>> modificationInfosGroups, NetworkInfos networkInfos, UUID reportUuid) {
NetworkStoreListener listener = NetworkStoreListener.create(networkInfos.getNetwork(), networkInfos.getNetworkUuuid(), networkStoreService, equipmentInfosService, collectionThreshold);
List<ApplicationStatus> groupsApplicationStatuses =
modificationInfosGroups.stream()
Expand Down Expand Up @@ -158,4 +220,9 @@ public static ApplicationStatus getApplicationStatus(ReportNode reportNode) {
throw new IllegalArgumentException(String.format("Report severity '%s' unknown !", severity.getValue()));
}
}

@PreDestroy
public void shutdown() {
applicationExecutor.shutdown();
}
}
4 changes: 4 additions & 0 deletions src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ powsybl-ws:
name: networkmodifications
queryBegin: '&'
customQuery: ${powsybl-ws.database.customQueryBegin}reWriteBatchedInserts=true

# maximum concurrent applications of modifications with preloadingStrategy=ALL_COLLECTIONS_NEEDED_FOR_BUS_VIEW
# to avoid out of memory issues
max-large-concurrent-applications: 2
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void testVoltageInitDuplicationLogs(final ApplicationStatus resultStatus, final
final NetworkStoreService networkStoreService = new NetworkStoreServicePublic(restClient, PreloadingStrategy.NONE,
(restClient_, preloadingStrategy, executorService) -> new CachedNetworkStoreClient(new OfflineNetworkStoreClient()));
final EquipmentInfosService equipmentInfosService = Mockito.mock(EquipmentInfosService.class);
final NetworkModificationApplicator networkModificationApplicator = new NetworkModificationApplicator(networkStoreService, equipmentInfosService, reportService, null);
final NetworkModificationApplicator networkModificationApplicator = new NetworkModificationApplicator(networkStoreService, equipmentInfosService, reportService, null, 2);
networkModificationApplicator.setCollectionThreshold(5);

final Network network = Network.read(Paths.get(this.getClass().getClassLoader().getResource("fourSubstations_testsOpenReac.xiidm").toURI()));
Expand Down

0 comments on commit 1df2e93

Please sign in to comment.