Skip to content

Commit

Permalink
further work
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <[email protected]>
  • Loading branch information
J-N-K committed Apr 2, 2022
1 parent 50e72d5 commit c6cece9
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
package org.openhab.core.persistence;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.registry.PersistenceServiceConfigurationProvider;

/**
* A persistence manager service which could be used to start event handling or supply configuration for
* persistence services.
*
* @deprecated implement a {@link PersistenceServiceConfigurationProvider} instead
* @deprecated implement a {@link org.openhab.core.persistence.registry.PersistenceServiceConfigurationProvider} instead
*
* @author Markus Rathgeb - Initial contribution
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.eclipse.jdt.annotation.Nullable;

/**
* The {@link PersistenceServiceConfigurationDTO} is a
* The {@link PersistenceServiceConfigurationDTO} is used for transferring persistence service configurations
*
* @author Jan N. Klug - Initial contribution
*/
Expand All @@ -33,7 +33,7 @@ public class PersistenceServiceConfigurationDTO {

public static class PersistenceItemConfigurationDTO {
public Collection<String> items = List.of();
public @Nullable Collection<String> strategies;
public Collection<String> strategies = List.of();
public @Nullable String alias;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Stream;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.common.NamedThreadFactory;
import org.openhab.core.common.SafeCaller;
import org.openhab.core.items.GenericItem;
Expand Down Expand Up @@ -117,7 +118,7 @@ protected void deactivate() {
persistenceServiceConfigurationRegistry.removeRegistryChangeListener(this);
started = false;

persistenceServiceContainers.values().forEach(PersistenceServiceContainer::clearCronJobs);
persistenceServiceContainers.values().forEach(PersistenceServiceContainer::cancelPersistJobs);

// remove item state change listeners
itemRegistry.stream().filter(GenericItem.class::isInstance)
Expand All @@ -128,15 +129,14 @@ protected void deactivate() {
protected void addPersistenceService(PersistenceService persistenceService) {
String serviceId = persistenceService.getId();
logger.debug("Initializing {} persistence service.", serviceId);
IdentifiablePersistenceServiceConfiguration serviceConfiguration = Objects.requireNonNullElse(
persistenceServiceConfigurationRegistry.get(serviceId), getDefaultConfig(persistenceService));
PersistenceServiceContainer container = new PersistenceServiceContainer(persistenceService,
serviceConfiguration);
persistenceServiceConfigurationRegistry.get(serviceId));

PersistenceServiceContainer oldContainer = persistenceServiceContainers.put(serviceId, container);

if (oldContainer != null) {
oldContainer.clearCronJobs();
if (oldContainer != null) { // cancel all jobs if the persistence service is set and an old configuration is
// already present
oldContainer.cancelPersistJobs();
}

if (started) {
Expand All @@ -147,7 +147,7 @@ protected void addPersistenceService(PersistenceService persistenceService) {
protected void removePersistenceService(PersistenceService persistenceService) {
PersistenceServiceContainer container = persistenceServiceContainers.remove(persistenceService.getId());
if (container != null) {
container.clearCronJobs();
container.cancelPersistJobs();
}
}

Expand All @@ -170,26 +170,23 @@ private void handleStateEvent(Item item, boolean changed) {
/**
* Checks if a given persistence configuration entry is relevant for an item
*
* @param config the persistence configuration entry
* @param itemConfig the persistence configuration entry
* @param item to check if the configuration applies to
* @return true, if the configuration applies to the item
*/
private boolean appliesToItem(PersistenceItemConfiguration config, Item item) {
for (PersistenceConfig itemCfg : config.getItems()) {
private boolean appliesToItem(PersistenceItemConfiguration itemConfig, Item item) {
for (PersistenceConfig itemCfg : itemConfig.getItems()) {
if (itemCfg instanceof PersistenceAllConfig) {
return true;
} else if (itemCfg instanceof PersistenceItemConfig) {
PersistenceItemConfig singleItemConfig = (PersistenceItemConfig) itemCfg;
if (item.getName().equals(singleItemConfig.getItem())) {
if (item.getName().equals(((PersistenceItemConfig) itemCfg).getItem())) {
return true;
}
} else if (itemCfg instanceof PersistenceGroupConfig) {
PersistenceGroupConfig groupItemConfig = (PersistenceGroupConfig) itemCfg;
try {
Item gItem = itemRegistry.getItem(groupItemConfig.getGroup());
Item gItem = itemRegistry.getItem(((PersistenceGroupConfig) itemCfg).getGroup());
if (gItem instanceof GroupItem) {
GroupItem groupItem = (GroupItem) gItem;
return groupItem.getAllMembers().contains(item);
return ((GroupItem) gItem).getAllMembers().contains(item);
}
} catch (ItemNotFoundException e) {
// do nothing
Expand Down Expand Up @@ -246,7 +243,6 @@ private Iterable<Item> getAllItems(PersistenceItemConfiguration config) {
*
* @param item the item to restore the state for
*/
@SuppressWarnings("null")
private void restoreItemStateIfNeeded(Item item) {
// get the last persisted state from the persistence service if no state is yet set
if (UnDefType.NULL.equals(item.getState()) && item instanceof GenericItem) {
Expand All @@ -260,27 +256,24 @@ private void restoreItemStateIfNeeded(Item item) {
QueryablePersistenceService queryService = (QueryablePersistenceService) container
.getPersistenceService();
FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setPageSize(1);
Iterable<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
Iterator<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
.onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.",
queryService.getId(), SafeCaller.DEFAULT_TIMEOUT))
.onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}",
queryService.getId(), e.getMessage(), e))
.build().query(filter);
if (result != null) {
Iterator<HistoricItem> it = result.iterator();
if (it.hasNext()) {
HistoricItem historicItem = it.next();
GenericItem genericItem = (GenericItem) item;
genericItem.removeStateChangeListener(this);
genericItem.setState(historicItem.getState());
genericItem.addStateChangeListener(this);
if (logger.isDebugEnabled()) {
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()),
item.getName(), historicItem.getState());
}
return;
.build().query(filter).iterator();
if (result.hasNext()) {
HistoricItem historicItem = result.next();
GenericItem genericItem = (GenericItem) item;
genericItem.removeStateChangeListener(this);
genericItem.setState(historicItem.getState());
genericItem.addStateChangeListener(this);
if (logger.isDebugEnabled()) {
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()),
item.getName(), historicItem.getState());
}
return;
}
}
}
Expand All @@ -289,15 +282,7 @@ private void restoreItemStateIfNeeded(Item item) {
private void startEventHandling(PersistenceServiceContainer serviceContainer) {
serviceContainer.getMatchingConfigurations(PersistenceStrategy.Globals.RESTORE)
.forEach(itemConfig -> getAllItems(itemConfig).forEach(this::restoreItemStateIfNeeded));
serviceContainer.startCronJobs();
}

private IdentifiablePersistenceServiceConfiguration getDefaultConfig(PersistenceService persistenceService) {
List<PersistenceStrategy> strategies = persistenceService.getDefaultStrategies();
List<PersistenceItemConfiguration> configs = List
.of(new PersistenceItemConfiguration(List.of(new PersistenceAllConfig()), null, strategies, null));
return new IdentifiablePersistenceServiceConfiguration(persistenceService.getId(), configs, strategies,
strategies);
serviceContainer.schedulePersistJobs();
}

// ItemStateChangeListener methods
Expand Down Expand Up @@ -362,7 +347,6 @@ public void onReadyMarkerRemoved(ReadyMarker readyMarker) {
public void added(IdentifiablePersistenceServiceConfiguration element) {
PersistenceServiceContainer container = persistenceServiceContainers.get(element.getUID());
if (container != null) {
container.clearCronJobs();
container.setConfiguration(element);
if (started) {
startEventHandling(container);
Expand All @@ -374,8 +358,7 @@ public void added(IdentifiablePersistenceServiceConfiguration element) {
public void removed(IdentifiablePersistenceServiceConfiguration element) {
PersistenceServiceContainer container = persistenceServiceContainers.get(element.getUID());
if (container != null) {
container.clearCronJobs();
container.setConfiguration(getDefaultConfig(container.persistenceService));
container.setConfiguration(null);
if (started) {
startEventHandling(container);
}
Expand All @@ -396,53 +379,78 @@ private class PersistenceServiceContainer {
private IdentifiablePersistenceServiceConfiguration configuration;

public PersistenceServiceContainer(PersistenceService persistenceService,
IdentifiablePersistenceServiceConfiguration configuration) {
@Nullable IdentifiablePersistenceServiceConfiguration configuration) {
this.persistenceService = persistenceService;
this.configuration = configuration;
this.configuration = Objects.requireNonNullElseGet(configuration, this::getDefaultConfig);
}

public PersistenceService getPersistenceService() {
return persistenceService;
}

public IdentifiablePersistenceServiceConfiguration getConfiguration() {
return configuration;
/**
* Set a new configuration for this persistence service (also cancels all cron jobs)
*
* @param configuration the new {@link IdentifiablePersistenceServiceConfiguration}, if {@code null} the default
* configuration of the service is used
*/
public void setConfiguration(@Nullable IdentifiablePersistenceServiceConfiguration configuration) {
cancelPersistJobs();
this.configuration = Objects.requireNonNullElseGet(configuration, this::getDefaultConfig);
}

/**
* Get all item configurations from this service that match a certain strategy
*
* @param strategy the {@link PersistenceStrategy} to look for
* @return a @link Stream<PersistenceItemConfiguration>} of the result
*/
public Stream<PersistenceItemConfiguration> getMatchingConfigurations(PersistenceStrategy strategy) {
boolean matchesDefaultStrategies = configuration.getDefaults().contains(strategy);
return configuration.getConfigs().stream()
.filter(itemConfig -> itemConfig.getStrategies().contains(strategy)
|| (itemConfig.getStrategies().isEmpty() && matchesDefaultStrategies));
}

public void setConfiguration(IdentifiablePersistenceServiceConfiguration configuration) {
this.configuration = configuration;
private IdentifiablePersistenceServiceConfiguration getDefaultConfig() {
List<PersistenceStrategy> strategies = persistenceService.getDefaultStrategies();
List<PersistenceItemConfiguration> configs = List
.of(new PersistenceItemConfiguration(List.of(new PersistenceAllConfig()), null, strategies, null));
return new IdentifiablePersistenceServiceConfiguration(persistenceService.getId(), configs, strategies,
strategies);
}

public void clearCronJobs() {
/**
* Cancel all scheduled cron jobs / strategies for this service
*/
public void cancelPersistJobs() {
synchronized (jobs) {
jobs.forEach(job -> job.cancel(true));
jobs.clear();
}
logger.debug("Removed scheduled cron job for persistence service '{}'", configuration.getUID());
}

public void startCronJobs() {
/**
* Schedule all necessary cron jobs / strategies for this service
*/
public void schedulePersistJobs() {
configuration.getStrategies().stream().filter(PersistenceCronStrategy.class::isInstance)
.forEach(strategy -> {
PersistenceCronStrategy cronStrategy = (PersistenceCronStrategy) strategy;
String cronExpression = cronStrategy.getCronExpression();
jobs.add(scheduler.schedule(() -> persistJob(cronStrategy), cronExpression));
List<PersistenceItemConfiguration> itemConfigs = getMatchingConfigurations(strategy)
.collect(Collectors.toList());
jobs.add(scheduler.schedule(() -> persistJob(itemConfigs), cronExpression));

logger.debug("Scheduled strategy {} with cron expression {} for service {}",
cronStrategy.getName(), cronExpression, configuration.getUID());

});
}

private void persistJob(PersistenceStrategy strategy) {
getMatchingConfigurations(strategy).forEach(itemConfig -> {
private void persistJob(List<PersistenceItemConfiguration> itemConfigs) {
itemConfigs.forEach(itemConfig -> {
for (Item item : getAllItems(itemConfig)) {
long startTime = System.nanoTime();
persistenceService.store(item, itemConfig.getAlias());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ public PersistenceServiceConfigurationDTO toDTO() {
PersistenceServiceConfigurationDTO.PersistenceItemConfigurationDTO itemDto = new PersistenceServiceConfigurationDTO.PersistenceItemConfigurationDTO();
itemDto.items = config.getItems().stream().map(this::persistenceConfigToString)
.collect(Collectors.toList());
Collection<PersistenceStrategy> strategies = config.getStrategies();
if (strategies != null) {
itemDto.strategies = strategies.stream().map(PersistenceStrategy::getName).collect(Collectors.toList());
}
itemDto.strategies = config.getStrategies().stream().map(PersistenceStrategy::getName)
.collect(Collectors.toList());
itemDto.alias = config.getAlias();
return itemDto;
}).collect(Collectors.toList());
Expand Down Expand Up @@ -106,14 +104,14 @@ public static IdentifiablePersistenceServiceConfiguration fromDTO(PersistenceSer
List<PersistenceStrategy> defaults = dto.defaults.stream()
.map(str -> stringToPersistenceStrategy(str, strategyMap, dto.serviceId)).collect(Collectors.toList());

List<PersistenceItemConfiguration> configs = dto.configs.stream().map(c -> {
List<PersistenceConfig> items = c.items.stream()
List<PersistenceItemConfiguration> configs = dto.configs.stream().map(config -> {
List<PersistenceConfig> items = config.items.stream()
.map(IdentifiablePersistenceServiceConfiguration::stringToPersistenceConfig)
.collect(Collectors.toList());
List<PersistenceStrategy> strategies = c.strategies.stream()
List<PersistenceStrategy> strategies = config.strategies.stream()
.map(str -> stringToPersistenceStrategy(str, strategyMap, dto.serviceId))
.collect(Collectors.toList());
return new PersistenceItemConfiguration(items, c.alias, strategies, List.of());
return new PersistenceItemConfiguration(items, config.alias, strategies, List.of());
}).collect(Collectors.toList());

return new IdentifiablePersistenceServiceConfiguration(dto.serviceId, configs, defaults, strategyMap.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String getName() {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + name.hashCode();
return result;
}

Expand Down
Loading

0 comments on commit c6cece9

Please sign in to comment.