Skip to content

Commit

Permalink
Implement gateway filter for JDBC (H2 & Postgresql) device registry:
Browse files Browse the repository at this point in the history
- add optional parameter "isGateway"
- functionality for listing only devices or gateways
- add documentation

Also-by: Matthias Feurer [email protected]
Signed-off-by: georgios dimitropoulos <[email protected]>
Signed-off-by: g.dimitropoulos <[email protected]>
  • Loading branch information
gdimitropoulos-sotec committed Jul 28, 2023
1 parent c356232 commit 5921cc9
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2019, 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -118,6 +118,11 @@ public final class RegistryManagementConstants extends RequestResponseApiConstan
*/
public static final String PARAM_SORT_JSON = "sortJson";

/**
* The name of the boolean filter query parameter for searching gateways or only devices.
*/
public static final String PARAM_IS_GATEWAY = "isGateway";


// DEVICES

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2022, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -83,6 +83,13 @@ public abstract class AbstractHttpEndpoint<T extends ServiceConfigProperties> ex
}
};

/**
* A function that tries to parse a string into an Optional boolean.
*/
protected static final Function<String, Optional<Boolean>> CONVERTER_BOOLEAN = s -> {
return Strings.isNullOrEmpty(s) ? Optional.empty() : Optional.of(Boolean.valueOf(s));
};

/**
* The configuration properties for this endpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,13 @@ public class TableManagementStore extends AbstractDeviceStore {
private final Statement updateDeviceVersionStatement;

private final Statement countDevicesOfTenantStatement;
private final Statement countDevicesWithFilter;
private final Statement countDevicesWithFilterStatement;
private final Statement countGatewaysOfTenantStatement;
private final Statement countOnlyDevicesOfTenantStatement;

private final Statement findDevicesStatement;
private final Statement findDevicesOfTenantStatement;
private final Statement findGatewaysOfTenantStatement;
private final Statement findOnlyDevicesOfTenantStatement;
private final Statement findDevicesOfTenantWithFilterStatement;

/**
Expand Down Expand Up @@ -205,20 +209,48 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
.validateParameters(
TENANT_ID);

this.countDevicesWithFilter = cfg
this.countGatewaysOfTenantStatement = cfg
.getRequiredStatement("countGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("countOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID);

this.countDevicesWithFilterStatement = cfg
.getRequiredStatement("countDevicesOfTenantWithFilter")
.validateParameters(
TENANT_ID,
FIELD,
VALUE);

this.findDevicesStatement = cfg
this.findDevicesOfTenantStatement = cfg
.getRequiredStatement("findDevicesOfTenant")
.validateParameters(
TENANT_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findOnlyDevicesOfTenantStatement = cfg
.getRequiredStatement("findOnlyDevicesOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findGatewaysOfTenantStatement = cfg
.getRequiredStatement("findGatewaysOfTenant")
.validateParameters(
TENANT_ID,
DEVICE_ID,
PAGE_SIZE,
PAGE_OFFSET);

this.findDevicesOfTenantWithFilterStatement = cfg
.getRequiredStatement("findDevicesOfTenantWithFilter")
.validateParameters(
Expand All @@ -229,46 +261,6 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final
PAGE_OFFSET);
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString(VERSION)).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Read a device and lock it for updates.
* <p>
Expand Down Expand Up @@ -851,6 +843,46 @@ private <T> Future<T> recoverNotFound(final Span span, final Throwable err, fina
}
}

private static Future<Object> checkUpdateOutcome(final UpdateResult updateResult) {

if (updateResult.getUpdated() < 0) {
// conflict
log.debug("Optimistic lock broke");
return Future.failedFuture(new OptimisticLockingException());
}

return Future.succeededFuture();

}

private static Future<String> extractVersionForUpdate(final ResultSet device, final Optional<String> resourceVersion) {
final Optional<String> version = device.getRows(true).stream().map(o -> o.getString("version")).findAny();

if (version.isEmpty()) {
log.debug("No version or no row found -> entity not found");
return Future.failedFuture(new EntityNotFoundException());
}

final var currentVersion = version.get();

return resourceVersion
// if we expect a certain version
.<Future<String>>map(expected -> {
// check ...
if (expected.equals(currentVersion)) {
// version matches, continue with current version
return Future.succeededFuture(currentVersion);
} else {
// version does not match, abort
return Future.failedFuture(new OptimisticLockingException());
}
}
)
// if we don't expect a version, continue with the current
.orElseGet(() -> Future.succeededFuture(currentVersion));

}

/**
* Get all credentials for a device.
* <p>
Expand Down Expand Up @@ -913,7 +945,7 @@ private List<CommonCredential> parseCredentials(final ResultSet result) {
final var entries = result.getRows(true);

return entries.stream()
.map(o -> o.getString(DATA))
.map(o -> o.getString("data"))
.map(s -> Json.decodeValue(s, CommonCredential.class))
.collect(Collectors.toList());

Expand All @@ -926,25 +958,41 @@ private List<CommonCredential> parseCredentials(final ResultSet result) {
* @param pageSize The page size.
* @param pageOffset The page offset.
* @param filters The list of filters (currently only the first value of the list is used).
* @param isGateway Optional search gateway or only devices filter.
* @param spanContext The span to contribute to.
* @return A future containing devices.
*/
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters,
public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, final int pageSize, final int pageOffset, final List<Filter> filters, final Optional<Boolean> isGateway,
final SpanContext spanContext) {


final var filter = filters.stream().findFirst();
final Statement findDeviceSqlStatement;
final Statement countStatement;
final String field;
final String value;

if (isGateway.isPresent()) {
field = "";
value = "";

final String field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
final var value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");
findDeviceSqlStatement = isGateway.get() ? this.findGatewaysOfTenantStatement : this.findOnlyDevicesOfTenantStatement;
countStatement = isGateway.get() ? this.countGatewaysOfTenantStatement : this.countOnlyDevicesOfTenantStatement;
} else {
final var filter = filters.stream().findFirst();

field = filter.map(filter1 -> filter1.getField().toString().replace("/", "")).orElse("");
value = filter.map(filter1 ->
filter1.getValue().toString()
.replace("/", "")
.replace("*", "%")
.replace("?", "_")
).orElse("");


findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesOfTenantStatement;
countStatement = (filter.isPresent()) ? countDevicesWithFilterStatement : this.countDevicesOfTenantStatement;
}

final Statement findDeviceSqlStatement = (filter.isPresent()) ? findDevicesOfTenantWithFilterStatement : this.findDevicesStatement;
final Statement countStatement = (filter.isPresent()) ? countDevicesWithFilter : this.countDevicesOfTenantStatement;

final var expanded = findDeviceSqlStatement.expand(map -> {
map.put(TENANT_ID, tenantId);
Expand Down Expand Up @@ -986,5 +1034,3 @@ public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, fin
.onComplete(x -> span.finish());
}
}


Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@

countDevicesOfTenantWithFilter: |
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
SELECT COUNT(*) AS deviceCount FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
OR
REPLACE(data, '"') LIKE CONCAT('%%', :field, ':', REPLACE(:value, '"'))
countGatewaysOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
countOnlyDevicesOfTenant: |
SELECT COUNT(*) AS deviceCount
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
findDevicesOfTenantWithFilter: |
SELECT *
FROM %s
WHERE
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT_WS(':', :field, REPLACE(:value, '"')), REPLACE(data, '"'))
Expand All @@ -21,3 +39,29 @@ findDevicesOfTenantWithFilter: |
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findGatewaysOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REPLACE(REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', ''), '\') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) > 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
findOnlyDevicesOfTenant: |
SELECT *
FROM %1$s
WHERE
tenant_id=:tenant_id
AND
LOCATE(CONCAT(device_id, '|'),
(SELECT CONCAT(REPLACE(group_concat(DISTINCT ids separator '|'), ',', '|'), '|') FROM
(SELECT DISTINCT REGEXP_REPLACE(REGEXP_SUBSTR(DATA, '"via":\[.*?\]' ), '"via":\[|\]|"', '') as ids FROM %1$s WHERE tenant_id=:tenant_id ))) = 0
ORDER BY device_id ASC
LIMIT :page_size
OFFSET :page_offset
Loading

0 comments on commit 5921cc9

Please sign in to comment.