From cb479e938b040477f9899b10f0eeffdcb1e92444 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 12:28:16 -0600 Subject: [PATCH 01/14] limit number of entries returned. --- .../java/com/uid2/operator/vertx/UIDOperatorVerticle.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index e9c59af56..d1b762712 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -528,6 +528,9 @@ public void handleKeysRequest(RoutingContext rc) { private String getSharingTokenExpirySeconds() { return config.getString(Const.Config.SharingTokenExpiryProp); } + private int maxIdentityBucketsResponseEntries() { + return config.getInteger(Const.Config.MaxIdentityBucketsResponseEntries, 1048576); + } public void handleKeysSharing(RoutingContext rc) { try { @@ -1119,6 +1122,10 @@ private void handleBucketsV2(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); + if (modified.size() > maxIdentityBucketsResponseEntries()) { + ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); + return; + } final JsonArray resp = new JsonArray(); if (modified != null) { for (SaltEntry e : modified) { From a4eea1068491a72187ff1c680969c145a4b9f4a5 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 13:53:42 -0600 Subject: [PATCH 02/14] return buckets in chunks of 30000 --- .../operator/vertx/UIDOperatorVerticle.java | 43 +++++++++++++------ .../uid2/operator/vertx/V2PayloadHandler.java | 8 ++-- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index d1b762712..472616c26 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -1108,6 +1108,19 @@ private void handleBucketsV1(RoutingContext rc) { } } + private String makeSaltEntriesString(List entries, int startIndex, int endIndexExclusive) { + StringBuilder s = new StringBuilder(); + for(int i = startIndex; i < endIndexExclusive; i++) { + SaltEntry e = entries.get(i); + s.append("{\"bucket_id\":\"") + .append(e.getHashedId()) + .append("\",\"last_updated\":\"") + .append(APIDateTimeFormatter.format(Instant.ofEpochMilli(e.getLastUpdated()))) + .append("\"},"); + } + return s.toString(); + } + private void handleBucketsV2(RoutingContext rc) { final JsonObject req = (JsonObject) rc.data().get("request"); final String qp = req.getString("since_timestamp"); @@ -1122,21 +1135,25 @@ private void handleBucketsV2(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); - if (modified.size() > maxIdentityBucketsResponseEntries()) { - ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); - return; - } - final JsonArray resp = new JsonArray(); +// final JsonArray resp = new JsonArray(); + HttpServerResponse response = rc.response(); if (modified != null) { - for (SaltEntry e : modified) { - final JsonObject o = new JsonObject(); - o.put("bucket_id", e.getHashedId()); - Instant lastUpdated = Instant.ofEpochMilli(e.getLastUpdated()); - - o.put("last_updated", APIDateTimeFormatter.format(lastUpdated)); - resp.add(o); + if (modified.size() > maxIdentityBucketsResponseEntries()) { + ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); + return; + } + LOGGER.info("processing " + String.valueOf(modified.size())); + response.setChunked(true); + response.write("["); + for(int i =0; i < modified.size(); i+=30000) { + String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + 30000, modified.size())); + if(i + 30000 >= modified.size()) { + saltEntries = saltEntries.substring(0, saltEntries.length() -1); + } + response.write(saltEntries); } - ResponseUtil.SuccessV2(rc, resp); + response.end("]"); +// ResponseUtil.SuccessV2(rc, resp); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index e59e49619..467a93111 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -173,9 +173,11 @@ private void passThrough(RoutingContext rc, Handler apiHandler) if (rc.response().getStatusCode() != 200) { return; } - JsonObject respJson = (JsonObject) rc.data().get("response"); - rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(respJson.encode()); + if(!rc.response().ended()) { + JsonObject respJson = (JsonObject) rc.data().get("response"); + rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(respJson.encode()); + } } private void writeResponse(RoutingContext rc, byte[] nonce, JsonObject resp, byte[] keyBytes) { From 758f094fc231aef381b5b5f88218c59c2b6250a1 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 16:52:00 -0600 Subject: [PATCH 03/14] Configurable identity buckets chunk size --- .../operator/vertx/UIDOperatorVerticle.java | 21 ++++++++++--------- .../operator/UIDOperatorVerticleTest.java | 3 +++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index 472616c26..e854d3823 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -528,9 +528,12 @@ public void handleKeysRequest(RoutingContext rc) { private String getSharingTokenExpirySeconds() { return config.getString(Const.Config.SharingTokenExpiryProp); } - private int maxIdentityBucketsResponseEntries() { + private int getMaxIdentityBucketsResponseEntries() { return config.getInteger(Const.Config.MaxIdentityBucketsResponseEntries, 1048576); } + private int getIdentityBucketsResponseChunkSize() { + return config.getInteger(Const.Config.IdentityBucketsResponseChunkSize, 1048576); + } public void handleKeysSharing(RoutingContext rc) { try { @@ -1135,25 +1138,23 @@ private void handleBucketsV2(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); -// final JsonArray resp = new JsonArray(); - HttpServerResponse response = rc.response(); if (modified != null) { - if (modified.size() > maxIdentityBucketsResponseEntries()) { + HttpServerResponse response = rc.response(); + if (modified.size() > getMaxIdentityBucketsResponseEntries()) { ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); return; } - LOGGER.info("processing " + String.valueOf(modified.size())); - response.setChunked(true); + int chunkSize = getIdentityBucketsResponseChunkSize(); + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); response.write("["); - for(int i =0; i < modified.size(); i+=30000) { - String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + 30000, modified.size())); - if(i + 30000 >= modified.size()) { + for(int i =0; i < modified.size(); i+=chunkSize) { + String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); + if(i + chunkSize >= modified.size()) { saltEntries = saltEntries.substring(0, saltEntries.length() -1); } response.write(saltEntries); } response.end("]"); -// ResponseUtil.SuccessV2(rc, resp); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index 5bbb03bfa..4ed6e2398 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -155,6 +155,9 @@ private void setupConfig(JsonObject config) { config.put("client_side_token_generate_log_invalid_http_origins", true); config.put(Const.Config.AllowClockSkewSecondsProp, 3600); + + config.put(Const.Config.IdentityBucketsResponseChunkSize, 15); + config.put(Const.Config.MaxIdentityBucketsResponseEntries, 50); } private static byte[] makeAesKey(String prefix) { From 5dd3332f6635a9531406bc8d37fe3541d60351c2 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:01:05 -0600 Subject: [PATCH 04/14] tests, refactoring, encryption --- .../operator/vertx/UIDOperatorVerticle.java | 67 ++++++++++++++++--- .../uid2/operator/vertx/V2PayloadHandler.java | 32 +++++---- .../operator/UIDOperatorVerticleTest.java | 59 ++++++++++++++++ 3 files changed, 130 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index e854d3823..71523485b 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -16,9 +16,11 @@ import com.uid2.operator.util.PrivacyBits; import com.uid2.operator.util.Tuple; import com.uid2.shared.Const.Data; +import com.uid2.shared.InstantClock; import com.uid2.shared.Utils; import com.uid2.shared.auth.*; import com.uid2.shared.encryption.AesGcm; +import com.uid2.shared.encryption.Random; import com.uid2.shared.health.HealthComponent; import com.uid2.shared.health.HealthManager; import com.uid2.shared.middleware.AuthMiddleware; @@ -1124,6 +1126,10 @@ private String makeSaltEntriesString(List entries, int startIndex, in return s.toString(); } + private static final String cipherScheme = "AES/GCM/NoPadding"; + public static final int GCM_AUTHTAG_LENGTH = 16; + public static final int GCM_IV_LENGTH = 12; + private void handleBucketsV2(RoutingContext rc) { final JsonObject req = (JsonObject) rc.data().get("request"); final String qp = req.getString("since_timestamp"); @@ -1139,28 +1145,67 @@ private void handleBucketsV2(RoutingContext rc) { } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); if (modified != null) { - HttpServerResponse response = rc.response(); if (modified.size() > getMaxIdentityBucketsResponseEntries()) { ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); return; } - int chunkSize = getIdentityBucketsResponseChunkSize(); - response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - response.write("["); - for(int i =0; i < modified.size(); i+=chunkSize) { - String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); - if(i + chunkSize >= modified.size()) { - saltEntries = saltEntries.substring(0, saltEntries.length() -1); - } - response.write(saltEntries); + try { + transmitModifiedBucketsInChunks(rc, modified); + } catch (InvalidKeyException | InvalidAlgorithmParameterException | NoSuchAlgorithmException | + NoSuchPaddingException | IllegalBlockSizeException | BadPaddingException e) { + ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } - response.end("]"); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); } } + private void transmitModifiedBucketsInChunks(RoutingContext rc, List modified) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { + V2RequestUtil.V2Request request = V2RequestUtil.parseRequest(rc.body().asString(), AuthMiddleware.getAuthClient(ClientKey.class, rc), new InstantClock()); + HttpServerResponse response = rc.response(); + + final SecretKey k = new SecretKeySpec(request.encryptionKey, "AES"); + final Cipher c = Cipher.getInstance(cipherScheme); + final byte[] ivBytes = Random.getBytes(GCM_IV_LENGTH); + GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_AUTHTAG_LENGTH * 8, ivBytes); + c.init(Cipher.ENCRYPT_MODE, k, gcmParameterSpec); + + int chunkSize = getIdentityBucketsResponseChunkSize(); + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + + Buffer b = Buffer.buffer(); + b.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); + b.appendBytes(request.nonce); + b.appendBytes("{\"body\":[".getBytes()); + b = Buffer.buffer(ivBytes).appendBytes(c.update(b.getBytes())); + for(int i =0; i < modified.size(); i+=chunkSize) { + String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); + if(i + chunkSize >= modified.size()) { + saltEntries = saltEntries.substring(0, saltEntries.length() -1); + saltEntries += "], \"status\":\"success\"}"; + b.appendBytes(c.doFinal(saltEntries.getBytes())); + } else { + b.appendBytes(c.update(saltEntries.getBytes())); + } + if (b.length() % 3 == 0 || b.length() < 3 || (i+chunkSize >= modified.size())) { + response.write(Utils.toBase64String(b.getBytes())); + b = Buffer.buffer(); + } else if ((b.length()-1) % 3 == 0) { + byte[] start = Arrays.copyOfRange(b.getBytes(), 0, b.length()-1); + byte[] end = Arrays.copyOfRange(b.getBytes(), b.length()-1, b.length()); + response.write(Utils.toBase64String(start)); + b = Buffer.buffer(end); + } else { + byte[] start = Arrays.copyOfRange(b.getBytes(), 0, b.length()-2); + byte[] end = Arrays.copyOfRange(b.getBytes(), b.length()-2, b.length()); + response.write(Utils.toBase64String(start)); + b = Buffer.buffer(end); + } + } + rc.response().end(); + } + private void handleIdentityMapV1(RoutingContext rc) { final InputUtil.InputVal input = this.phoneSupport ? this.getTokenInputV1(rc) : this.getTokenInput(rc); if (this.phoneSupport ? !checkTokenInputV1(input, rc) : !checkTokenInput(input, rc)) { diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index 467a93111..d51b8d374 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -107,8 +107,7 @@ public void handleTokenGenerate(RoutingContext rc, Handler apiHa } writeResponse(rc, request.nonce, respJson, request.encryptionKey); - } - catch (Exception ex){ + } catch (Exception ex) { LOGGER.error("Failed to generate token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } @@ -130,8 +129,7 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan return; } rc.data().put("request", request.payload); - } - else { + } else { rc.data().put("request", bodyString); } @@ -152,16 +150,14 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); // Encrypt whole payload using key shared with client. byte[] encryptedResp = AesGcm.encrypt( - respJson.encode().getBytes(StandardCharsets.UTF_8), - request.encryptionKey); + respJson.encode().getBytes(StandardCharsets.UTF_8), + request.encryptionKey); rc.response().end(Utils.toBase64String(encryptedResp)); - } - else { + } else { rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(respJson.encode()); + .end(respJson.encode()); } - } - catch (Exception ex){ + } catch (Exception ex) { LOGGER.error("Failed to refresh token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } @@ -170,17 +166,19 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan private void passThrough(RoutingContext rc, Handler apiHandler) { rc.data().put("request", rc.body().asJsonObject()); apiHandler.handle(rc); - if (rc.response().getStatusCode() != 200) { + if (rc.response().getStatusCode() != 200 || rc.response().ended()) { return; } - if(!rc.response().ended()) { - JsonObject respJson = (JsonObject) rc.data().get("response"); - rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(respJson.encode()); - } + JsonObject respJson = (JsonObject) rc.data().get("response"); + rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(respJson.encode()); + } private void writeResponse(RoutingContext rc, byte[] nonce, JsonObject resp, byte[] keyBytes) { + if (rc.response().ended()) { + return; + } Buffer buffer = Buffer.buffer(); buffer.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); buffer.appendBytes(nonce); diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index 4ed6e2398..4cb306e11 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -61,6 +61,8 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.stream.Collectors; @@ -215,6 +217,7 @@ private void send(String apiVersion, Vertx vertx, String endpoint, boolean isV1G byte[] decrypted = AesGcm.decrypt(Utils.decodeBase64String(ar.result().bodyAsString()), 0, ck.getSecretBytes()); assertArrayEquals(Buffer.buffer().appendLong(nonce).getBytes(), Buffer.buffer(decrypted).slice(8, 16).getBytes()); + System.out.println(new String(decrypted, 16, decrypted.length - 16, StandardCharsets.UTF_8)); JsonObject respJson = new JsonObject(new String(decrypted, 16, decrypted.length - 16, StandardCharsets.UTF_8)); handler.handle(respJson); @@ -805,6 +808,62 @@ RefreshToken decodeRefreshToken(EncryptedTokenEncoder encoder, String refreshTok return decodeRefreshToken(encoder, refreshTokenString, IdentityType.Email); } + private void validateIdentityBuckets(JsonArray expectedList, List actualList) { + final DateTimeFormatter APIDateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); + + assertEquals(expectedList.size(), actualList.size()); + for(int i = 0; i < expectedList.size(); i++) { + JsonObject expected = expectedList.getJsonObject(i); + SaltEntry actual = actualList.get(i); + assertAll("Salt Entry Matches", + () -> assertEquals(expected.getString("bucket_id"), actual.getHashedId()), + () -> assertEquals(expected.getString("last_updated"), APIDateTimeFormatter.format(Instant.ofEpochMilli(actual.getLastUpdated())))); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 11, 15, 20, 30, 35, 50}) + void identityBucketsChunked(int numModifiedSalts, Vertx vertx, VertxTestContext testContext) { + final int clientSiteId = 201; + fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); + List modifiedSalts = new ArrayList<>(); + for(int i = 0; i < numModifiedSalts; i++) { + modifiedSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + } + when(saltProviderSnapshot.getModifiedSince(any())).thenReturn(modifiedSalts); + + JsonObject req = new JsonObject(); + req.put("since_timestamp", "2023-04-08T13:00:00"); + + send("v2", vertx, "v2" + "/identity/buckets", false, null, req, 200, respJson -> { + assertTrue(respJson.containsKey("body")); + assertFalse(respJson.containsKey("client_error")); + validateIdentityBuckets(respJson.getJsonArray("body"), modifiedSalts); + testContext.completeNow(); + }); + } + + @Test + void identityBucketsLimit(Vertx vertx, VertxTestContext testContext) { + final int clientSiteId = 201; + fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); + List modifiedSalts = new ArrayList<>(); + for(int i = 0; i < 51; i++) { + modifiedSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + } + when(saltProviderSnapshot.getModifiedSince(any())).thenReturn(modifiedSalts); + + JsonObject req = new JsonObject(); + req.put("since_timestamp", "2023-04-08T13:00:00"); + + send("v2", vertx, "v2" + "/identity/buckets", false, null, req, 400, respJson -> { + assertFalse(respJson.containsKey("body")); + assertEquals("client_error", respJson.getString("status")); + assertEquals("provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map", respJson.getString("message")); + testContext.completeNow(); + }); + } + @ParameterizedTest @ValueSource(strings = {"v1", "v2"}) void identityMapNewClientNoPolicySpecified(String apiVersion, Vertx vertx, VertxTestContext testContext) { From d36c7021e167f7fff1a2cccc538e3a8423669028 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:34:10 -0600 Subject: [PATCH 05/14] update v1 buckets --- .../operator/vertx/UIDOperatorVerticle.java | 62 ++++++++++++------- .../operator/UIDOperatorVerticleTest.java | 13 ++-- 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index 71523485b..d6b291c8b 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -1096,17 +1096,12 @@ private void handleBucketsV1(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); - final JsonArray resp = new JsonArray(); if (modified != null) { - for (SaltEntry e : modified) { - final JsonObject o = new JsonObject(); - o.put("bucket_id", e.getHashedId()); - Instant lastUpdated = Instant.ofEpochMilli(e.getLastUpdated()); - - o.put("last_updated", APIDateTimeFormatter.format(lastUpdated)); - resp.add(o); + if (modified.size() > getMaxIdentityBucketsResponseEntries()) { + ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); + return; } - ResponseUtil.Success(rc, resp); + transmitModifiedBucketsInChunks(rc, modified); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); @@ -1126,10 +1121,6 @@ private String makeSaltEntriesString(List entries, int startIndex, in return s.toString(); } - private static final String cipherScheme = "AES/GCM/NoPadding"; - public static final int GCM_AUTHTAG_LENGTH = 16; - public static final int GCM_IV_LENGTH = 12; - private void handleBucketsV2(RoutingContext rc) { final JsonObject req = (JsonObject) rc.data().get("request"); final String qp = req.getString("since_timestamp"); @@ -1150,7 +1141,7 @@ private void handleBucketsV2(RoutingContext rc) { return; } try { - transmitModifiedBucketsInChunks(rc, modified); + transmitModifiedBucketsInChunksEncrypted(rc, modified); } catch (InvalidKeyException | InvalidAlgorithmParameterException | NoSuchAlgorithmException | NoSuchPaddingException | IllegalBlockSizeException | BadPaddingException e) { ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); @@ -1161,10 +1152,34 @@ private void handleBucketsV2(RoutingContext rc) { } } - private void transmitModifiedBucketsInChunks(RoutingContext rc, List modified) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { + private void transmitModifiedBucketsInChunks(RoutingContext rc, List modified) { + + HttpServerResponse response = rc.response(); + int chunkSize = getIdentityBucketsResponseChunkSize(); + + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + response.write("{\"body\":["); + + for(int i =0; i < modified.size(); i+=chunkSize) { + String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); + if(i + chunkSize >= modified.size()) { + saltEntries = saltEntries.substring(0, saltEntries.length() -1); + saltEntries += "], \"status\":\"success\"}"; + response.write(saltEntries); + } else { + response.write(saltEntries); + } + } + rc.response().end(); + } + + private void transmitModifiedBucketsInChunksEncrypted(RoutingContext rc, List modified) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { V2RequestUtil.V2Request request = V2RequestUtil.parseRequest(rc.body().asString(), AuthMiddleware.getAuthClient(ClientKey.class, rc), new InstantClock()); HttpServerResponse response = rc.response(); + final String cipherScheme = "AES/GCM/NoPadding"; + final int GCM_AUTHTAG_LENGTH = 16; + final int GCM_IV_LENGTH = 12; final SecretKey k = new SecretKeySpec(request.encryptionKey, "AES"); final Cipher c = Cipher.getInstance(cipherScheme); final byte[] ivBytes = Random.getBytes(GCM_IV_LENGTH); @@ -1179,8 +1194,11 @@ private void transmitModifiedBucketsInChunks(RoutingContext rc, List b.appendBytes(request.nonce); b.appendBytes("{\"body\":[".getBytes()); b = Buffer.buffer(ivBytes).appendBytes(c.update(b.getBytes())); + for(int i =0; i < modified.size(); i+=chunkSize) { + String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); + if(i + chunkSize >= modified.size()) { saltEntries = saltEntries.substring(0, saltEntries.length() -1); saltEntries += "], \"status\":\"success\"}"; @@ -1188,20 +1206,18 @@ private void transmitModifiedBucketsInChunks(RoutingContext rc, List } else { b.appendBytes(c.update(saltEntries.getBytes())); } + if (b.length() % 3 == 0 || b.length() < 3 || (i+chunkSize >= modified.size())) { response.write(Utils.toBase64String(b.getBytes())); b = Buffer.buffer(); } else if ((b.length()-1) % 3 == 0) { - byte[] start = Arrays.copyOfRange(b.getBytes(), 0, b.length()-1); - byte[] end = Arrays.copyOfRange(b.getBytes(), b.length()-1, b.length()); - response.write(Utils.toBase64String(start)); - b = Buffer.buffer(end); + response.write(Utils.toBase64String(Arrays.copyOfRange(b.getBytes(), 0, b.length()-1))); + b = Buffer.buffer(Arrays.copyOfRange(b.getBytes(), b.length()-1, b.length())); } else { - byte[] start = Arrays.copyOfRange(b.getBytes(), 0, b.length()-2); - byte[] end = Arrays.copyOfRange(b.getBytes(), b.length()-2, b.length()); - response.write(Utils.toBase64String(start)); - b = Buffer.buffer(end); + response.write(Utils.toBase64String(Arrays.copyOfRange(b.getBytes(), 0, b.length()-2))); + b = Buffer.buffer(Arrays.copyOfRange(b.getBytes(), b.length()-2, b.length())); } + } rc.response().end(); } diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index 4cb306e11..2d8ea5874 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -822,8 +822,8 @@ private void validateIdentityBuckets(JsonArray expectedList, List act } @ParameterizedTest - @ValueSource(ints = {1, 11, 15, 20, 30, 35, 50}) - void identityBucketsChunked(int numModifiedSalts, Vertx vertx, VertxTestContext testContext) { + @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,50", "v2,50"}) + void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx, VertxTestContext testContext) { final int clientSiteId = 201; fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); List modifiedSalts = new ArrayList<>(); @@ -835,7 +835,7 @@ void identityBucketsChunked(int numModifiedSalts, Vertx vertx, VertxTestContext JsonObject req = new JsonObject(); req.put("since_timestamp", "2023-04-08T13:00:00"); - send("v2", vertx, "v2" + "/identity/buckets", false, null, req, 200, respJson -> { + send(apiVersion, vertx, apiVersion + "/identity/buckets", true, "since_timestamp=" +urlEncode("2023-04-08T13:00:00"), req, 200, respJson -> { assertTrue(respJson.containsKey("body")); assertFalse(respJson.containsKey("client_error")); validateIdentityBuckets(respJson.getJsonArray("body"), modifiedSalts); @@ -843,8 +843,9 @@ void identityBucketsChunked(int numModifiedSalts, Vertx vertx, VertxTestContext }); } - @Test - void identityBucketsLimit(Vertx vertx, VertxTestContext testContext) { + @ParameterizedTest + @ValueSource(strings = {"v1", "v2"}) + void identityBucketsLimit(String apiVersion, Vertx vertx, VertxTestContext testContext) { final int clientSiteId = 201; fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); List modifiedSalts = new ArrayList<>(); @@ -856,7 +857,7 @@ void identityBucketsLimit(Vertx vertx, VertxTestContext testContext) { JsonObject req = new JsonObject(); req.put("since_timestamp", "2023-04-08T13:00:00"); - send("v2", vertx, "v2" + "/identity/buckets", false, null, req, 400, respJson -> { + send(apiVersion, vertx, apiVersion + "/identity/buckets", true, "since_timestamp=" + urlEncode("2023-04-08T13:00:00"), req, 400, respJson -> { assertFalse(respJson.containsKey("body")); assertEquals("client_error", respJson.getString("status")); assertEquals("provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map", respJson.getString("message")); From 8f518439dee1958a6f96e687571ba7a65ecc6158 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:37:22 -0600 Subject: [PATCH 06/14] undo unnecessary whitespace change --- src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index d51b8d374..e26e49f3f 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -150,12 +150,12 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); // Encrypt whole payload using key shared with client. byte[] encryptedResp = AesGcm.encrypt( - respJson.encode().getBytes(StandardCharsets.UTF_8), - request.encryptionKey); + respJson.encode().getBytes(StandardCharsets.UTF_8), + request.encryptionKey); rc.response().end(Utils.toBase64String(encryptedResp)); } else { rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(respJson.encode()); + .end(respJson.encode()); } } catch (Exception ex) { LOGGER.error("Failed to refresh token", ex); From 3962d985513f82800150de2de80b01e06949f1ae Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:38:49 -0600 Subject: [PATCH 07/14] undo unnecessary whitespace change --- .../com/uid2/operator/vertx/V2PayloadHandler.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index e26e49f3f..c90b20085 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -107,7 +107,8 @@ public void handleTokenGenerate(RoutingContext rc, Handler apiHa } writeResponse(rc, request.nonce, respJson, request.encryptionKey); - } catch (Exception ex) { + } + catch (Exception ex) { LOGGER.error("Failed to generate token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } @@ -129,7 +130,8 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan return; } rc.data().put("request", request.payload); - } else { + } + else { rc.data().put("request", bodyString); } @@ -153,11 +155,13 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan respJson.encode().getBytes(StandardCharsets.UTF_8), request.encryptionKey); rc.response().end(Utils.toBase64String(encryptedResp)); - } else { + } + else { rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") .end(respJson.encode()); } - } catch (Exception ex) { + } + catch (Exception ex) { LOGGER.error("Failed to refresh token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } From 0e69b2f64391de46a2348823815bb01ce0918d0f Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:40:14 -0600 Subject: [PATCH 08/14] undo unnecessary whitespace change --- src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index c90b20085..c775a9b28 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -108,7 +108,7 @@ public void handleTokenGenerate(RoutingContext rc, Handler apiHa writeResponse(rc, request.nonce, respJson, request.encryptionKey); } - catch (Exception ex) { + catch (Exception ex){ LOGGER.error("Failed to generate token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } @@ -161,7 +161,7 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan .end(respJson.encode()); } } - catch (Exception ex) { + catch (Exception ex){ LOGGER.error("Failed to refresh token", ex); ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } From 75fbfcf7fcd4500b2b923513b1fb0f6f5e402edc Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Mon, 15 Apr 2024 20:41:46 -0600 Subject: [PATCH 09/14] undo unnecessary whitespace change --- src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index c775a9b28..048543990 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -175,7 +175,7 @@ private void passThrough(RoutingContext rc, Handler apiHandler) } JsonObject respJson = (JsonObject) rc.data().get("response"); rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(respJson.encode()); + .end(respJson.encode()); } From 8e52064214193f4361392cae89fac8a085f6ddb9 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Tue, 16 Apr 2024 23:26:19 -0600 Subject: [PATCH 10/14] 3 part stream --- .../service/IModifiedBucketEncryptStream.java | 50 ++++ .../service/ModifiedBucketEncodeStream.java | 195 +++++++++++++++ .../service/ModifiedBucketEncryptStream.java | 224 ++++++++++++++++++ .../service/ModifiedBucketReadStream.java | 162 +++++++++++++ .../operator/vertx/UIDOperatorVerticle.java | 52 +--- .../uid2/operator/vertx/V2PayloadHandler.java | 4 +- .../operator/UIDOperatorVerticleTest.java | 18 +- 7 files changed, 654 insertions(+), 51 deletions(-) create mode 100644 src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java create mode 100644 src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java create mode 100644 src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java create mode 100644 src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java diff --git a/src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java new file mode 100644 index 000000000..f34953916 --- /dev/null +++ b/src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java @@ -0,0 +1,50 @@ +package com.uid2.operator.service; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +public interface IModifiedBucketEncryptStream extends ReadStream, WriteStream { + IModifiedBucketEncryptStream exceptionHandler(Handler handler); + + Future write(Buffer buffer); + + void write(Buffer buffer, Handler> handler); + + void end(Handler> handler); + + WriteStream setWriteQueueMaxSize(int i); + + boolean writeQueueFull(); + + WriteStream drainHandler(Handler handler); + + ReadStream handler(Handler handler); + + ReadStream pause(); + + ReadStream resume(); + + ReadStream fetch(long l); + + ReadStream endHandler(Handler handler); + + @Override + default Pipe pipe() { + return ReadStream.super.pipe(); + } + + @Override + default Future pipeTo(WriteStream dst) { + return ReadStream.super.pipeTo(dst); + } + + @Override + default void pipeTo(WriteStream dst, Handler> handler) { + ReadStream.super.pipeTo(dst, handler); + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java new file mode 100644 index 000000000..f2b496e5a --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java @@ -0,0 +1,195 @@ +package com.uid2.operator.service; + +import com.uid2.shared.Utils; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + + +public class ModifiedBucketEncodeStream implements IModifiedBucketEncryptStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncodeStream.class); + + private final Context context; + + private Handler exceptionHandler; + private Handler endHandler; + private Handler dataHandler; + private Handler drainHandler; + + private boolean readInProgress; + private boolean incomingStreamEnded = false; + private boolean outgoingStreamEnded = false; + private long maxBufferSizeBytes = 5242880; // 5 MB + private long demand = Long.MAX_VALUE; + + Buffer data; + + public ModifiedBucketEncodeStream(Context context) { + this.context = context; + this.data = Buffer.buffer(); + } + + @Override + public synchronized IModifiedBucketEncryptStream exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + @Override + public synchronized Future write(Buffer buffer) { + synchronized (this) { + data.appendBuffer(buffer); + } + return Future.succeededFuture(); + } + + @Override + public synchronized void write(Buffer buffer, Handler> handler) { + synchronized (this) { + data.appendBuffer(buffer); + } + succeededAsyncResult(handler); + } + + private void succeededAsyncResult(Handler> handler) { + handler.handle(new AsyncResult<>() { + @Override + public Void result() { + return null; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean succeeded() { + return true; + } + + @Override + public boolean failed() { + return false; + } + }); + } + + @Override + public synchronized void end(Handler> handler) { + this.incomingStreamEnded = true; + succeededAsyncResult(handler); + } + + @Override + public synchronized WriteStream setWriteQueueMaxSize(int i) { + maxBufferSizeBytes = i; + return this; + } + + @Override + public synchronized boolean writeQueueFull() { + return data.length() > maxBufferSizeBytes; + } + + @Override + public synchronized WriteStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + // ReadStream methods + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private void read() { + if (this.readInProgress) { + if (!incomingStreamEnded || !outgoingStreamEnded) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + String chunk = ""; + if (data.length() == 0) { + return chunk; + } + + synchronized (this) { + if (data.length() % 3 == 0 || incomingStreamEnded) { + chunk = Utils.toBase64String(data.getBytes()); + data = Buffer.buffer(); + outgoingStreamEnded = true; + } else if ((data.length() - 1) % 3 == 0) { + chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 1)); + data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 1, data.length())); + } else { + chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 2)); + data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 2, data.length())); + } + return chunk; + } + }, asyncResult -> { + this.dataHandler.handle(Buffer.buffer(asyncResult.result())); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && (!incomingStreamEnded || !outgoingStreamEnded)) { + context.runOnContext(unused -> read()); + } else if (outgoingStreamEnded && endHandler != null) { + endHandler.handle(null); + } + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java new file mode 100644 index 000000000..dda93f2cc --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -0,0 +1,224 @@ +package com.uid2.operator.service; + +import com.uid2.shared.encryption.Random; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.SecretKey; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + + +public class ModifiedBucketEncryptStream implements IModifiedBucketEncryptStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncryptStream.class); + + private final Context context; + + private Handler exceptionHandler; + private Handler endHandler; + private Handler dataHandler; + private Handler drainHandler; + + private boolean readInProgress; + private boolean wroteStart = false; + private boolean incomingStreamEnded = false; + private boolean outgoingStreamEnded = false; + private long maxBufferSizeBytes = 5242880; + private long demand = Long.MAX_VALUE; + + Buffer data; + + final String cipherScheme = "AES/GCM/NoPadding"; + final int GCM_AUTHTAG_LENGTH = 16; + final int GCM_IV_LENGTH = 12; + final SecretKey k; + final byte[] nonce; + final Cipher c = Cipher.getInstance(cipherScheme); + final byte[] ivBytes = Random.getBytes(GCM_IV_LENGTH); + GCMParameterSpec gcmParameterSpec; + + public ModifiedBucketEncryptStream(Context context, byte[] encryptionKey, byte[] nonce) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException { + this.context = context; + this.data = Buffer.buffer(); + this.k = new SecretKeySpec(encryptionKey, "AES"); + this.nonce = nonce; + this.gcmParameterSpec = new GCMParameterSpec(GCM_AUTHTAG_LENGTH * 8, ivBytes); + c.init(Cipher.ENCRYPT_MODE, k, gcmParameterSpec); + } + + @Override + public synchronized IModifiedBucketEncryptStream exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + @Override + public synchronized Future write(Buffer buffer) { + synchronized (this) { + data.appendBuffer(buffer); + } + return Future.succeededFuture(); + } + + @Override + public synchronized void write(Buffer buffer, Handler> handler) { + synchronized (this) { + data.appendBuffer(buffer); + } + succeededAsyncResult(handler); + } + + private void succeededAsyncResult(Handler> handler) { + handler.handle(new AsyncResult() { + @Override + public Void result() { + return null; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean succeeded() { + return true; + } + + @Override + public boolean failed() { + return false; + } + }); + } + + @Override + public synchronized void end(Handler> handler) { + this.incomingStreamEnded = true; + succeededAsyncResult(handler); + } + + @Override + public synchronized WriteStream setWriteQueueMaxSize(int i) { + maxBufferSizeBytes = i; + return this; + } + + @Override + public synchronized boolean writeQueueFull() { + return data.length() > maxBufferSizeBytes; + } + + @Override + public synchronized WriteStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + // ReadStream methods + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private void read() { + if (this.readInProgress) { + if ((!incomingStreamEnded || !outgoingStreamEnded)) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + Buffer chunk = Buffer.buffer(); + if (data.length() == 0) { + return chunk; + } + + if (!wroteStart) { + chunk.appendBytes(ivBytes); + Buffer b = Buffer.buffer(); + b.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); + b.appendBytes(this.nonce); + chunk.appendBytes(c.update(b.getBytes())); + wroteStart = true; + } + + synchronized (this) { + if (!incomingStreamEnded) { + chunk.appendBytes(c.update(data.getBytes())); + LOGGER.info(data.toString()); + data = Buffer.buffer(); + } else { + chunk.appendBytes(c.doFinal(data.getBytes())); + data = Buffer.buffer(); + LOGGER.info(data.toString()); + outgoingStreamEnded = true; + } + return chunk; + } + }, asyncResult -> { + this.dataHandler.handle(asyncResult.result()); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && (!incomingStreamEnded || !outgoingStreamEnded)) { + context.runOnContext(unused -> read()); + } else if (outgoingStreamEnded && endHandler != null) { + endHandler.handle(null); + } + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java new file mode 100644 index 000000000..f7237d913 --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java @@ -0,0 +1,162 @@ +package com.uid2.operator.service; + +import com.uid2.shared.model.SaltEntry; +import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.concurrent.Callable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ModifiedBucketReadStream implements ReadStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketReadStream.class); + private static final DateTimeFormatter APIDateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); + + private final Context context; + + private Handler dataHandler; + private Handler endHandler; + private Handler exceptionHandler; + + private boolean readInProgress; + private boolean wroteStart = false; + private boolean streamEnded = false; + private long demand = Long.MAX_VALUE; + private final int chunkSize; + + private final List modified; + + public ModifiedBucketReadStream(Context context, List modified, int chunkSize) { + this.context = context; + this.modified = modified; + this.chunkSize = chunkSize; + } + + private String makeSaltEntriesString() { + StringBuilder s = new StringBuilder(); + for (int i = 0; i < chunkSize && i < modified.size(); i++) { + SaltEntry e = modified.remove(0); + s.append("{\"bucket_id\":\"") + .append(e.getHashedId()) + .append("\",\"last_updated\":\"") + .append(APIDateTimeFormatter.format(Instant.ofEpochMilli(e.getLastUpdated()))) + .append("\"},"); + } + return s.toString(); + } + + @Override + public synchronized ReadStream exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private void read() { + if (this.readInProgress) { + if (!streamEnded && !modified.isEmpty()) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + StringBuilder salts = new StringBuilder(); + + if (!wroteStart) { + salts.append("{\"body\":["); + wroteStart = true; + } + + salts.append(makeSaltEntriesString()); + + if (modified.isEmpty()) { + salts.deleteCharAt(salts.length() - 1); // remove trailing comma + salts.append("],\"status\":\"success\"}"); + streamEnded = true; + } + return salts.toString(); + }, asyncResult -> { + this.dataHandler.handle(Buffer.buffer(asyncResult.result().getBytes())); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && !streamEnded) { + context.runOnContext(unused -> read()); + } else if (streamEnded && endHandler != null) { + endHandler.handle(null); + } + } + + @Override + public Pipe pipe() { + return ReadStream.super.pipe(); + } + + @Override + public Future pipeTo(WriteStream dst) { + return ReadStream.super.pipeTo(dst); + } + + @Override + public void pipeTo(WriteStream dst, Handler> handler) { + ReadStream.super.pipeTo(dst, handler); + } +} diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index d6b291c8b..4749b18ea 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -1177,49 +1177,19 @@ private void transmitModifiedBucketsInChunksEncrypted(RoutingContext rc, List {response.end(Utils.toBase64String(b.getBytes()));}); +// encryptStream.handler((buf) -> b.appendBytes(buf.getBytes()) ); + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); + readStream.pipe().endOnSuccess(true).to(encryptStream); + encryptStream.pipe().endOnSuccess(true).to(encodeStream); +// encryptStream.endHandler((ar) -> {response.end();}); + encodeStream.pipe().endOnSuccess(true).to(response); - int chunkSize = getIdentityBucketsResponseChunkSize(); - response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - - Buffer b = Buffer.buffer(); - b.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); - b.appendBytes(request.nonce); - b.appendBytes("{\"body\":[".getBytes()); - b = Buffer.buffer(ivBytes).appendBytes(c.update(b.getBytes())); - - for(int i =0; i < modified.size(); i+=chunkSize) { - - String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); - - if(i + chunkSize >= modified.size()) { - saltEntries = saltEntries.substring(0, saltEntries.length() -1); - saltEntries += "], \"status\":\"success\"}"; - b.appendBytes(c.doFinal(saltEntries.getBytes())); - } else { - b.appendBytes(c.update(saltEntries.getBytes())); - } - if (b.length() % 3 == 0 || b.length() < 3 || (i+chunkSize >= modified.size())) { - response.write(Utils.toBase64String(b.getBytes())); - b = Buffer.buffer(); - } else if ((b.length()-1) % 3 == 0) { - response.write(Utils.toBase64String(Arrays.copyOfRange(b.getBytes(), 0, b.length()-1))); - b = Buffer.buffer(Arrays.copyOfRange(b.getBytes(), b.length()-1, b.length())); - } else { - response.write(Utils.toBase64String(Arrays.copyOfRange(b.getBytes(), 0, b.length()-2))); - b = Buffer.buffer(Arrays.copyOfRange(b.getBytes(), b.length()-2, b.length())); - } - - } - rc.response().end(); } private void handleIdentityMapV1(RoutingContext rc) { diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index 048543990..bb503ef74 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -170,7 +170,7 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan private void passThrough(RoutingContext rc, Handler apiHandler) { rc.data().put("request", rc.body().asJsonObject()); apiHandler.handle(rc); - if (rc.response().getStatusCode() != 200 || rc.response().ended()) { + if (rc.response().getStatusCode() != 200 || rc.response().ended() || rc.request().path().contains("/identity/buckets")) { return; } JsonObject respJson = (JsonObject) rc.data().get("response"); @@ -180,7 +180,7 @@ private void passThrough(RoutingContext rc, Handler apiHandler) } private void writeResponse(RoutingContext rc, byte[] nonce, JsonObject resp, byte[] keyBytes) { - if (rc.response().ended()) { + if (rc.response().ended() || rc.request().path().contains("/identity/buckets")) { return; } Buffer buffer = Buffer.buffer(); diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index 2d8ea5874..b97510d00 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -808,27 +808,29 @@ RefreshToken decodeRefreshToken(EncryptedTokenEncoder encoder, String refreshTok return decodeRefreshToken(encoder, refreshTokenString, IdentityType.Email); } - private void validateIdentityBuckets(JsonArray expectedList, List actualList) { + private void validateIdentityBuckets(List expectedList, JsonArray actualList) { final DateTimeFormatter APIDateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); assertEquals(expectedList.size(), actualList.size()); - for(int i = 0; i < expectedList.size(); i++) { - JsonObject expected = expectedList.getJsonObject(i); - SaltEntry actual = actualList.get(i); + for(int i = 0; i < actualList.size(); i++) { + JsonObject actual = actualList.getJsonObject(i); + SaltEntry expected = expectedList.get(i); assertAll("Salt Entry Matches", - () -> assertEquals(expected.getString("bucket_id"), actual.getHashedId()), - () -> assertEquals(expected.getString("last_updated"), APIDateTimeFormatter.format(Instant.ofEpochMilli(actual.getLastUpdated())))); + () -> assertEquals(expected.getHashedId(), actual.getString("bucket_id")), + () -> assertEquals(APIDateTimeFormatter.format(Instant.ofEpochMilli(expected.getLastUpdated())), actual.getString("last_updated"))); } } @ParameterizedTest - @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,50", "v2,50"}) + @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,45", "v2,45", "v1,50", "v2,50"}) void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx, VertxTestContext testContext) { final int clientSiteId = 201; fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); List modifiedSalts = new ArrayList<>(); + List actualSalts = new ArrayList<>(); for(int i = 0; i < numModifiedSalts; i++) { modifiedSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + actualSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); } when(saltProviderSnapshot.getModifiedSince(any())).thenReturn(modifiedSalts); @@ -838,7 +840,7 @@ void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx send(apiVersion, vertx, apiVersion + "/identity/buckets", true, "since_timestamp=" +urlEncode("2023-04-08T13:00:00"), req, 200, respJson -> { assertTrue(respJson.containsKey("body")); assertFalse(respJson.containsKey("client_error")); - validateIdentityBuckets(respJson.getJsonArray("body"), modifiedSalts); + validateIdentityBuckets(actualSalts, respJson.getJsonArray("body")); testContext.completeNow(); }); } From 400d7ad215f8ddc63119f203e5589a955042727f Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Tue, 16 Apr 2024 23:40:05 -0600 Subject: [PATCH 11/14] refactoring --- .../service/ModifiedBucketEncryptStream.java | 2 - .../operator/vertx/UIDOperatorVerticle.java | 39 +++---------------- 2 files changed, 5 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java index dda93f2cc..43c99c973 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -197,12 +197,10 @@ private void read() { synchronized (this) { if (!incomingStreamEnded) { chunk.appendBytes(c.update(data.getBytes())); - LOGGER.info(data.toString()); data = Buffer.buffer(); } else { chunk.appendBytes(c.doFinal(data.getBytes())); data = Buffer.buffer(); - LOGGER.info(data.toString()); outgoingStreamEnded = true; } return chunk; diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index 4749b18ea..480519873 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -1108,19 +1108,6 @@ private void handleBucketsV1(RoutingContext rc) { } } - private String makeSaltEntriesString(List entries, int startIndex, int endIndexExclusive) { - StringBuilder s = new StringBuilder(); - for(int i = startIndex; i < endIndexExclusive; i++) { - SaltEntry e = entries.get(i); - s.append("{\"bucket_id\":\"") - .append(e.getHashedId()) - .append("\",\"last_updated\":\"") - .append(APIDateTimeFormatter.format(Instant.ofEpochMilli(e.getLastUpdated()))) - .append("\"},"); - } - return s.toString(); - } - private void handleBucketsV2(RoutingContext rc) { final JsonObject req = (JsonObject) rc.data().get("request"); final String qp = req.getString("since_timestamp"); @@ -1153,43 +1140,27 @@ private void handleBucketsV2(RoutingContext rc) { } private void transmitModifiedBucketsInChunks(RoutingContext rc, List modified) { - HttpServerResponse response = rc.response(); - int chunkSize = getIdentityBucketsResponseChunkSize(); + + ModifiedBucketReadStream readStream = new ModifiedBucketReadStream(this.vertx.getOrCreateContext(), modified, getIdentityBucketsResponseChunkSize()); response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - response.write("{\"body\":["); - - for(int i =0; i < modified.size(); i+=chunkSize) { - String saltEntries = makeSaltEntriesString(modified, i, Math.min(i + chunkSize, modified.size())); - if(i + chunkSize >= modified.size()) { - saltEntries = saltEntries.substring(0, saltEntries.length() -1); - saltEntries += "], \"status\":\"success\"}"; - response.write(saltEntries); - } else { - response.write(saltEntries); - } - } - rc.response().end(); + readStream.pipe().endOnSuccess(true).to(response); } private void transmitModifiedBucketsInChunksEncrypted(RoutingContext rc, List modified) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { V2RequestUtil.V2Request request = V2RequestUtil.parseRequest(rc.body().asString(), AuthMiddleware.getAuthClient(ClientKey.class, rc), new InstantClock()); HttpServerResponse response = rc.response(); - ModifiedBucketReadStream readStream = new ModifiedBucketReadStream(this.vertx.getOrCreateContext(), modified, this.config.getInteger(Const.Config.IdentityBucketsResponseChunkSize)); + ModifiedBucketReadStream readStream = new ModifiedBucketReadStream(this.vertx.getOrCreateContext(), modified, getIdentityBucketsResponseChunkSize()); ModifiedBucketEncryptStream encryptStream = new ModifiedBucketEncryptStream(this.vertx.getOrCreateContext(), request.encryptionKey, request.nonce); ModifiedBucketEncodeStream encodeStream = new ModifiedBucketEncodeStream(this.vertx.getOrCreateContext()); -// Buffer b = Buffer.buffer(); -// encryptStream.endHandler((ar) -> {response.end(Utils.toBase64String(b.getBytes()));}); -// encryptStream.handler((buf) -> b.appendBytes(buf.getBytes()) ); + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); readStream.pipe().endOnSuccess(true).to(encryptStream); encryptStream.pipe().endOnSuccess(true).to(encodeStream); -// encryptStream.endHandler((ar) -> {response.end();}); encodeStream.pipe().endOnSuccess(true).to(response); - } private void handleIdentityMapV1(RoutingContext rc) { From 4f0bbf7492138a5cc9c9f5c1be734df4f4e59107 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Wed, 17 Apr 2024 17:45:21 -0600 Subject: [PATCH 12/14] rename interface --- ...EncryptStream.java => IModifiedBucketReadWriteStream.java} | 4 ++-- .../com/uid2/operator/service/ModifiedBucketEncodeStream.java | 4 ++-- .../uid2/operator/service/ModifiedBucketEncryptStream.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) rename src/main/java/com/uid2/operator/service/{IModifiedBucketEncryptStream.java => IModifiedBucketReadWriteStream.java} (86%) diff --git a/src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java similarity index 86% rename from src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java rename to src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java index f34953916..4c9fc5fc9 100644 --- a/src/main/java/com/uid2/operator/service/IModifiedBucketEncryptStream.java +++ b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java @@ -8,8 +8,8 @@ import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; -public interface IModifiedBucketEncryptStream extends ReadStream, WriteStream { - IModifiedBucketEncryptStream exceptionHandler(Handler handler); +public interface IModifiedBucketReadWriteStream extends ReadStream, WriteStream { + IModifiedBucketReadWriteStream exceptionHandler(Handler handler); Future write(Buffer buffer); diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java index f2b496e5a..597091bf7 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java @@ -15,7 +15,7 @@ import java.util.Arrays; -public class ModifiedBucketEncodeStream implements IModifiedBucketEncryptStream { +public class ModifiedBucketEncodeStream implements IModifiedBucketReadWriteStream { private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncodeStream.class); private final Context context; @@ -39,7 +39,7 @@ public ModifiedBucketEncodeStream(Context context) { } @Override - public synchronized IModifiedBucketEncryptStream exceptionHandler(Handler handler) { + public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { this.exceptionHandler = handler; return this; } diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java index 43c99c973..02fbbc75b 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -22,7 +22,7 @@ import java.security.NoSuchAlgorithmException; -public class ModifiedBucketEncryptStream implements IModifiedBucketEncryptStream { +public class ModifiedBucketEncryptStream implements IModifiedBucketReadWriteStream { private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncryptStream.class); private final Context context; @@ -60,7 +60,7 @@ public ModifiedBucketEncryptStream(Context context, byte[] encryptionKey, byte[] } @Override - public synchronized IModifiedBucketEncryptStream exceptionHandler(Handler handler) { + public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { this.exceptionHandler = handler; return this; } From a8e45069a51857cf355efa0630a9acc37bd0d5d7 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Thu, 18 Apr 2024 16:17:47 -0600 Subject: [PATCH 13/14] fixed concurrency bug --- .../operator/service/ModifiedBucketEncodeStream.java | 10 ++++++---- .../operator/service/ModifiedBucketEncryptStream.java | 2 +- .../operator/service/ModifiedBucketReadStream.java | 1 - .../com/uid2/operator/UIDOperatorVerticleTest.java | 5 +++++ 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java index 597091bf7..7b8230e83 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java @@ -143,7 +143,7 @@ public synchronized ReadStream endHandler(Handler handler) { return this; } - private void read() { + private synchronized void read() { if (this.readInProgress) { if (!incomingStreamEnded || !outgoingStreamEnded) { this.context.runOnContext(v -> this.read()); @@ -163,12 +163,10 @@ private void read() { if (data.length() == 0) { return chunk; } - synchronized (this) { if (data.length() % 3 == 0 || incomingStreamEnded) { chunk = Utils.toBase64String(data.getBytes()); data = Buffer.buffer(); - outgoingStreamEnded = true; } else if ((data.length() - 1) % 3 == 0) { chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 1)); data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 1, data.length())); @@ -176,8 +174,12 @@ private void read() { chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 2)); data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 2, data.length())); } - return chunk; + + if(incomingStreamEnded) { + outgoingStreamEnded = true; + } } + return chunk; }, asyncResult -> { this.dataHandler.handle(Buffer.buffer(asyncResult.result())); this.readInProgress = false; diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java index 02fbbc75b..35853d30a 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -166,7 +166,7 @@ public synchronized ReadStream endHandler(Handler handler) { private void read() { if (this.readInProgress) { - if ((!incomingStreamEnded || !outgoingStreamEnded)) { + if (!incomingStreamEnded || !outgoingStreamEnded) { this.context.runOnContext(v -> this.read()); } return; diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java index f7237d913..49134a081 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java @@ -16,7 +16,6 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.List; -import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index b97510d00..432d47f8c 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -80,6 +80,10 @@ @ExtendWith(VertxExtension.class) public class UIDOperatorVerticleTest { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(UIDOperatorVerticleTest.class); + + private final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); private static final Instant legacyClientCreationDateTime = Instant.ofEpochSecond(OPT_OUT_CHECK_CUTOFF_DATE).minus(1, ChronoUnit.SECONDS); private static final Instant newClientCreationDateTime = Instant.ofEpochSecond(OPT_OUT_CHECK_CUTOFF_DATE).plus(1, ChronoUnit.SECONDS); @@ -214,6 +218,7 @@ private void send(String apiVersion, Vertx vertx, String endpoint, boolean isV1G assertEquals(expectedHttpCode, ar.result().statusCode()); if (ar.result().statusCode() == 200) { + LOGGER.info(ar.result().bodyAsString()); byte[] decrypted = AesGcm.decrypt(Utils.decodeBase64String(ar.result().bodyAsString()), 0, ck.getSecretBytes()); assertArrayEquals(Buffer.buffer().appendLong(nonce).getBytes(), Buffer.buffer(decrypted).slice(8, 16).getBytes()); From 73b06e7fac6dcf4bf60447b81aede12070b03388 Mon Sep 17 00:00:00 2001 From: "ian.nara" Date: Sun, 21 Apr 2024 12:28:13 -0600 Subject: [PATCH 14/14] refactoring, strengthening tests --- .../IModifiedBucketReadWriteStream.java | 24 ++++++++++++++++ .../service/ModifiedBucketEncodeStream.java | 28 +------------------ .../service/ModifiedBucketEncryptStream.java | 28 +------------------ .../service/ModifiedBucketReadStream.java | 17 ----------- .../operator/vertx/UIDOperatorVerticle.java | 2 +- .../operator/UIDOperatorVerticleTest.java | 9 +++--- 6 files changed, 32 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java index 4c9fc5fc9..6720e49d4 100644 --- a/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java +++ b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java @@ -47,4 +47,28 @@ default Future pipeTo(WriteStream dst) { default void pipeTo(WriteStream dst, Handler> handler) { ReadStream.super.pipeTo(dst, handler); } + + default void succeededAsyncResult(Handler> handler) { + handler.handle(new AsyncResult<>() { + @Override + public Void result() { + return null; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean succeeded() { + return true; + } + + @Override + public boolean failed() { + return false; + } + }); + } } diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java index 7b8230e83..70028046e 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java @@ -20,10 +20,9 @@ public class ModifiedBucketEncodeStream implements IModifiedBucketReadWriteStrea private final Context context; - private Handler exceptionHandler; private Handler endHandler; private Handler dataHandler; - private Handler drainHandler; + private Handler drainHandler; // used by pipe private boolean readInProgress; private boolean incomingStreamEnded = false; @@ -40,7 +39,6 @@ public ModifiedBucketEncodeStream(Context context) { @Override public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { - this.exceptionHandler = handler; return this; } @@ -60,30 +58,6 @@ public synchronized void write(Buffer buffer, Handler> handler succeededAsyncResult(handler); } - private void succeededAsyncResult(Handler> handler) { - handler.handle(new AsyncResult<>() { - @Override - public Void result() { - return null; - } - - @Override - public Throwable cause() { - return null; - } - - @Override - public boolean succeeded() { - return true; - } - - @Override - public boolean failed() { - return false; - } - }); - } - @Override public synchronized void end(Handler> handler) { this.incomingStreamEnded = true; diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java index 35853d30a..0d1235382 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -27,10 +27,9 @@ public class ModifiedBucketEncryptStream implements IModifiedBucketReadWriteStre private final Context context; - private Handler exceptionHandler; private Handler endHandler; private Handler dataHandler; - private Handler drainHandler; + private Handler drainHandler; // used by pipe private boolean readInProgress; private boolean wroteStart = false; @@ -61,7 +60,6 @@ public ModifiedBucketEncryptStream(Context context, byte[] encryptionKey, byte[] @Override public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { - this.exceptionHandler = handler; return this; } @@ -81,30 +79,6 @@ public synchronized void write(Buffer buffer, Handler> handler succeededAsyncResult(handler); } - private void succeededAsyncResult(Handler> handler) { - handler.handle(new AsyncResult() { - @Override - public Void result() { - return null; - } - - @Override - public Throwable cause() { - return null; - } - - @Override - public boolean succeeded() { - return true; - } - - @Override - public boolean failed() { - return false; - } - }); - } - @Override public synchronized void end(Handler> handler) { this.incomingStreamEnded = true; diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java index 49134a081..ff94048d7 100644 --- a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java @@ -29,7 +29,6 @@ public class ModifiedBucketReadStream implements ReadStream { private Handler dataHandler; private Handler endHandler; - private Handler exceptionHandler; private boolean readInProgress; private boolean wroteStart = false; @@ -60,7 +59,6 @@ private String makeSaltEntriesString() { @Override public synchronized ReadStream exceptionHandler(Handler handler) { - this.exceptionHandler = handler; return this; } @@ -143,19 +141,4 @@ private synchronized void scheduleNextRead() { endHandler.handle(null); } } - - @Override - public Pipe pipe() { - return ReadStream.super.pipe(); - } - - @Override - public Future pipeTo(WriteStream dst) { - return ReadStream.super.pipeTo(dst); - } - - @Override - public void pipeTo(WriteStream dst, Handler> handler) { - ReadStream.super.pipeTo(dst, handler); - } } diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index 480519873..c57fb3493 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -1159,7 +1159,7 @@ private void transmitModifiedBucketsInChunksEncrypted(RoutingContext rc, List expectedList, JsonArray act } @ParameterizedTest - @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,45", "v2,45", "v1,50", "v2,50"}) - void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx, VertxTestContext testContext) { + @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,45", "v2,45", "v1,50", "v2,50", "v1,10001", "v2,10001", "v1,255000", "v2,255000", "v1,500000", "v2,500000"}) + void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx, VertxTestContext testContext) throws InterruptedException { final int clientSiteId = 201; fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); List modifiedSalts = new ArrayList<>();