diff --git a/pom.xml b/pom.xml index 5f71072..3d4fd6d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ statisticsnorway/dapla-dlp-pseudo-core - 1.2.8 + 1.3.0-SNAPSHOT 32.0.0-jre 1.5.1 1.4.6 diff --git a/src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java b/src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java index 3757026..a105081 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java @@ -8,7 +8,7 @@ import java.util.Map; public interface StreamProcessor { - Completable init(InputStream is, RecordMapSerializer serializer); + Completable init(InputStream is); Flowable process(InputStream is, RecordMapSerializer serializer); @FunctionalInterface public interface ItemProcessor { diff --git a/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamProcessor.java b/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamProcessor.java index 2de6d20..69eacc1 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamProcessor.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamProcessor.java @@ -12,6 +12,7 @@ import no.ssb.dlp.pseudo.core.StreamProcessor; import no.ssb.dlp.pseudo.core.map.RecordMapProcessor; import no.ssb.dlp.pseudo.core.map.RecordMapSerializer; +import no.ssb.dlp.pseudo.core.map.RecordMapSerializerFactory; import java.io.InputStream; import java.util.LinkedHashMap; @@ -22,12 +23,12 @@ @Slf4j public class CsvStreamProcessor implements StreamProcessor { - private final RecordMapProcessor recordMapProcessor; + private final RecordMapProcessor recordMapProcessor; @Override - public Completable init(InputStream is, RecordMapSerializer serializer) { + public Completable init(InputStream is) { if (recordMapProcessor.hasPreprocessors()) { - return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map))); + return Completable.fromPublisher(processStream(is, RecordMapSerializerFactory.emptySerializer(), recordMapProcessor::init)); } else { return Completable.complete(); } @@ -35,7 +36,7 @@ public Completable init(InputStream is, RecordMapSerializer serializer) { @Override public Flowable process(InputStream is, RecordMapSerializer serializer) { - return processStream(is, serializer, (map) -> recordMapProcessor.process(map)); + return processStream(is, serializer, recordMapProcessor::process); } CsvProcessorContext initCsvProcessorContext(InputStream is, RecordMapSerializer serializer) { diff --git a/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamPseudonymizer.java b/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamPseudonymizer.java deleted file mode 100644 index 1dc3447..0000000 --- a/src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamPseudonymizer.java +++ /dev/null @@ -1,81 +0,0 @@ -package no.ssb.dlp.pseudo.core.csv; - -import com.univocity.parsers.common.record.Record; -import com.univocity.parsers.csv.CsvParser; -import com.univocity.parsers.csv.CsvParserSettings; -import io.reactivex.Emitter; -import io.reactivex.Flowable; -import lombok.RequiredArgsConstructor; -import lombok.Value; -import lombok.extern.slf4j.Slf4j; -import no.ssb.dlp.pseudo.core.PseudoOperation; -import no.ssb.dlp.pseudo.core.StreamPseudonymizer; -import no.ssb.dlp.pseudo.core.map.RecordMapPseudonymizer; -import no.ssb.dlp.pseudo.core.map.RecordMapSerializer; - -import java.io.IOException; -import java.io.InputStream; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @deprecated Use {@link CsvStreamProcessor} instead - */ -@RequiredArgsConstructor -@Slf4j -@Deprecated -public class CsvStreamPseudonymizer implements StreamPseudonymizer { - - private final RecordMapPseudonymizer recordPseudonymizer; - - @Override - public Flowable pseudonymize(InputStream is, RecordMapSerializer serializer) { - return processStream(PseudoOperation.PSEUDONYMIZE, is, serializer); - } - - @Override - public Flowable depseudonymize(InputStream is, RecordMapSerializer serializer) { - return processStream(PseudoOperation.DEPSEUDONYMIZE, is, serializer); - } - - CsvProcessorContext initCsvProcessorContext(PseudoOperation operation, InputStream is, RecordMapSerializer serializer) throws IOException { - CsvParserSettings settings = new CsvParserSettings(); - settings.detectFormatAutomatically(); - settings.setHeaderExtractionEnabled(true); - final CsvParser csvParser = new CsvParser(settings); - csvParser.beginParsing(is); - return new CsvProcessorContext<>(operation, csvParser, serializer); - } - - private Flowable processStream(PseudoOperation operation, InputStream is, RecordMapSerializer serializer) { - return Flowable.generate( - () -> initCsvProcessorContext(operation, is, serializer), - (ctx, emitter) -> {this.processItem(ctx, emitter);} - ); - } - - private void processItem(CsvProcessorContext ctx, Emitter emitter) { - Record r = ctx.csvParser.parseNextRecord(); - if (r != null) { - int position = ctx.currentPosition.getAndIncrement(); - Map recordMap = r.fillFieldObjectMap(new LinkedHashMap<>()); - Map processedRecord = ctx.operation == PseudoOperation.PSEUDONYMIZE - ? recordPseudonymizer.pseudonymize(recordMap) - : recordPseudonymizer.depseudonymize(recordMap); - emitter.onNext(ctx.getSerializer().serialize(processedRecord, position)); - } - else { - emitter.onComplete(); - } - } - - @Value - static class CsvProcessorContext { - private final PseudoOperation operation; - private final CsvParser csvParser; - private final RecordMapSerializer serializer; - private final AtomicInteger currentPosition = new AtomicInteger(); - } - -} diff --git a/src/main/java/no/ssb/dlp/pseudo/core/field/FieldPseudonymizer.java b/src/main/java/no/ssb/dlp/pseudo/core/field/FieldPseudonymizer.java index 683c89c..1ea1742 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/field/FieldPseudonymizer.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/field/FieldPseudonymizer.java @@ -17,6 +17,7 @@ import static no.ssb.dlp.pseudo.core.PseudoOperation.DEPSEUDONYMIZE; import static no.ssb.dlp.pseudo.core.PseudoOperation.PSEUDONYMIZE; +@Deprecated(forRemoval = true) // Should use PseudoFuncs instead public class FieldPseudonymizer { private final PseudoFuncs pseudoFuncs; @@ -25,11 +26,11 @@ private FieldPseudonymizer(PseudoFuncs pseudoFuncs) { this.pseudoFuncs = pseudoFuncs; } - public String pseudonymize(FieldDescriptor field, String varValue) { + public PseudoFuncOutput pseudonymize(FieldDescriptor field, String varValue) { return process(PSEUDONYMIZE, field, varValue); } - public String depseudonymize(FieldDescriptor field, String varValue) { + public PseudoFuncOutput depseudonymize(FieldDescriptor field, String varValue) { return process(DEPSEUDONYMIZE, field, varValue); } @@ -37,31 +38,29 @@ public Optional match(FieldDescriptor field) { return pseudoFuncs.findPseudoFunc(field); } - public String init(FieldDescriptor field, String varValue) { + public void init(FieldDescriptor field, String varValue) { Optional match = pseudoFuncs.findPseudoFunc(field); if (match.isPresent()) { match.get().getFunc().init(PseudoFuncInput.of(varValue)); } - return varValue; } - private String process(PseudoOperation operation, FieldDescriptor field, String varValue) { + private PseudoFuncOutput process(PseudoOperation operation, FieldDescriptor field, String varValue) { // TODO: This check is function type specific (e.g. only applies for FPE?) if (varValue == null || varValue.length() <= 2) { - return varValue; + return PseudoFuncOutput.of(varValue); } PseudoFuncRuleMatch match = pseudoFuncs.findPseudoFunc(field).orElse(null); try { if (match == null) { - return varValue; + return PseudoFuncOutput.of(varValue); } - PseudoFuncOutput res = (operation == PSEUDONYMIZE) + return (operation == PSEUDONYMIZE) ? match.getFunc().apply(PseudoFuncInput.of(varValue)) : match.getFunc().restore(PseudoFuncInput.of(varValue)); - return (String) res.getFirstValue(); } catch (Exception e) { throw new PseudoException(operation + " error - field='" + field.getPath() + "', originalValue='" + varValue + "'", e); @@ -74,6 +73,8 @@ public static class Builder { private Collection keysets; + private String correlationId; + public Builder secrets(Collection secrets) { this.secrets = secrets; return this; @@ -89,10 +90,15 @@ public Builder keysets(Collection keysets) { return this; } + public Builder correlationId(String correlationId) { + this.correlationId = correlationId; + return this; + } + public FieldPseudonymizer build() { Objects.requireNonNull(secrets, "PseudoSecrets can't be null"); Objects.requireNonNull(rules, "PseudoFuncRule collection can't be null"); - return new FieldPseudonymizer(new PseudoFuncs(rules, secrets, keysets)); + return new FieldPseudonymizer(new PseudoFuncs(rules, secrets, keysets, correlationId)); } } } diff --git a/src/main/java/no/ssb/dlp/pseudo/core/func/PseudoFuncs.java b/src/main/java/no/ssb/dlp/pseudo/core/func/PseudoFuncs.java index 0cbcc29..54700d4 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/func/PseudoFuncs.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/func/PseudoFuncs.java @@ -30,14 +30,19 @@ public class PseudoFuncs { private final Map ruleToFuncMap = new LinkedHashMap<>(); + //TODO: Validate that all required secrets are available - public PseudoFuncs(Collection rules, Collection pseudoSecrets, Collection keysets) { - Map ruleToPseudoFuncConfigs = initPseudoFuncConfigs(rules, pseudoSecrets, keysets); + public PseudoFuncs(Collection rules, Collection pseudoSecrets, + Collection keysets, String correlationId) { + Map ruleToPseudoFuncConfigs = initPseudoFuncConfigs(rules, pseudoSecrets, keysets, correlationId); rules.forEach(rule -> ruleToFuncMap.put(rule, PseudoFuncFactory.create(ruleToPseudoFuncConfigs.get(rule)))); } // TODO: Move these init functions elsewhere? - static Map initPseudoFuncConfigs(Collection pseudoRules, Collection pseudoSecrets, Collection pseudoKeysets) { + static Map initPseudoFuncConfigs(Collection pseudoRules, + Collection pseudoSecrets, + Collection pseudoKeysets, + String correlationId) { Map pseudoSecretsMap = pseudoSecrets.stream().collect( Collectors.toMap(PseudoSecret::getName, Function.identity())); @@ -49,6 +54,7 @@ static Map initPseudoFuncConfigs(Collection { PseudoFuncConfig funcConfig = PseudoFuncConfigFactory.get(rule.getFunc()); + funcConfig.add(Param.CORRELATION_ID, correlationId); if (FpeFunc.class.getName().equals(funcConfig.getFuncImpl())) { enrichLegacyFpeFuncConfig(funcConfig, pseudoSecretsMap); @@ -168,4 +174,7 @@ public PseudoFuncConfigException(String message, Exception e) { } } + public static final class Param { + public static final String CORRELATION_ID = "correlationId"; + } } diff --git a/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamProcessor.java b/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamProcessor.java index 5cec504..ca5155a 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamProcessor.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamProcessor.java @@ -15,6 +15,7 @@ import no.ssb.dlp.pseudo.core.map.RecordMap; import no.ssb.dlp.pseudo.core.map.RecordMapProcessor; import no.ssb.dlp.pseudo.core.map.RecordMapSerializer; +import no.ssb.dlp.pseudo.core.map.RecordMapSerializerFactory; import java.io.IOException; import java.io.InputStream; @@ -36,9 +37,9 @@ public class JsonStreamProcessor implements StreamProcessor { private final RecordMapProcessor recordMapProcessor; @Override - public Completable init(InputStream is, RecordMapSerializer serializer) { + public Completable init(InputStream is) { if (recordMapProcessor.hasPreprocessors()) { - return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map))); + return Completable.fromPublisher(processStream(is, RecordMapSerializerFactory.emptySerializer(), recordMapProcessor::init)); } else { return Completable.complete(); } diff --git a/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamPseudonymizer.java b/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamPseudonymizer.java deleted file mode 100644 index b3f7620..0000000 --- a/src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamPseudonymizer.java +++ /dev/null @@ -1,97 +0,0 @@ -package no.ssb.dlp.pseudo.core.json; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import io.reactivex.Emitter; -import io.reactivex.Flowable; -import lombok.RequiredArgsConstructor; -import lombok.Value; -import lombok.extern.slf4j.Slf4j; -import no.ssb.dlp.pseudo.core.PseudoOperation; -import no.ssb.dlp.pseudo.core.StreamPseudonymizer; -import no.ssb.dlp.pseudo.core.map.RecordMap; -import no.ssb.dlp.pseudo.core.map.RecordMapPseudonymizer; -import no.ssb.dlp.pseudo.core.map.RecordMapSerializer; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @deprecated Use {@link JsonStreamProcessor} instead - */ -@RequiredArgsConstructor -@Slf4j -@Deprecated -public class JsonStreamPseudonymizer implements StreamPseudonymizer { - - private static final ObjectMapper OBJECT_MAPPER; - - static { - OBJECT_MAPPER = new ObjectMapper(); - OBJECT_MAPPER.registerModule(new JavaTimeModule()); - OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - } - - private final RecordMapPseudonymizer recordPseudonymizer; - - @Override - public Flowable pseudonymize(InputStream is, RecordMapSerializer serializer) { - return processStream(PseudoOperation.PSEUDONYMIZE, is, serializer); - } - - @Override - public Flowable depseudonymize(InputStream is, RecordMapSerializer serializer) { - return processStream(PseudoOperation.DEPSEUDONYMIZE, is, serializer); - } - - JsonProcessorContext initJsonProcessorContext(PseudoOperation operation, InputStream is, RecordMapSerializer serializer) throws IOException { - final JsonParser jsonParser = OBJECT_MAPPER.getFactory().createParser(is); - return new JsonProcessorContext<>(operation, jsonParser, serializer); - } - - private Flowable processStream(PseudoOperation operation, InputStream is, RecordMapSerializer serializer) { - return Flowable.generate( - () -> initJsonProcessorContext(operation, is, serializer), - (ctx, emitter) -> {this.processItem(ctx, emitter);}, - JsonProcessorContext::close - ); - } - - private void processItem(JsonProcessorContext ctx, Emitter emitter) throws IOException { - JsonParser jsonParser = ctx.getJsonParser(); - JsonToken jsonToken = jsonParser.nextToken(); - while (jsonToken == JsonToken.START_ARRAY || jsonToken == JsonToken.END_ARRAY) { - jsonToken = jsonParser.nextToken(); - } - - if (jsonToken != null) { - int position = ctx.currentPosition.getAndIncrement(); - Map r = OBJECT_MAPPER.readValue(jsonParser, RecordMap.class); - Map processedRecord = ctx.operation == PseudoOperation.PSEUDONYMIZE - ? recordPseudonymizer.pseudonymize(r) - : recordPseudonymizer.depseudonymize(r); - emitter.onNext(ctx.getSerializer().serialize(processedRecord, position)); - } - else { - emitter.onComplete(); - } - } - - @Value - static class JsonProcessorContext { - private final PseudoOperation operation; - private final JsonParser jsonParser; - private final RecordMapSerializer serializer; - private final AtomicInteger currentPosition = new AtomicInteger(); - - public void close() throws IOException { - jsonParser.close(); - } - } - -} diff --git a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapProcessor.java b/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapProcessor.java index e06045e..1ac3a0d 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapProcessor.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapProcessor.java @@ -1,14 +1,22 @@ package no.ssb.dlp.pseudo.core.map; +import io.reactivex.processors.FlowableProcessor; +import lombok.Getter; import lombok.RequiredArgsConstructor; import no.ssb.dlp.pseudo.core.field.ValueInterceptorChain; import java.util.Map; @RequiredArgsConstructor -public class RecordMapProcessor { +public class RecordMapProcessor { private final ValueInterceptorChain valueInterceptorChain; - + @Getter + private final MetadataProcessor metadataProcessor; + @FunctionalInterface + public interface MetadataProcessor { + // The MetadataProcessor is used to publish/subscribe to events related to the processing of each RecordMap + FlowableProcessor toFlowableProcessor(); + } public Map init(Map r) { return MapTraverser.traverse(r, valueInterceptorChain::init); } diff --git a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapPseudonymizer.java b/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapPseudonymizer.java deleted file mode 100644 index 500ac57..0000000 --- a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapPseudonymizer.java +++ /dev/null @@ -1,19 +0,0 @@ -package no.ssb.dlp.pseudo.core.map; - -import lombok.RequiredArgsConstructor; -import no.ssb.dlp.pseudo.core.field.FieldPseudonymizer; - -import java.util.Map; - -@RequiredArgsConstructor -public class RecordMapPseudonymizer { - private final FieldPseudonymizer fieldPseudonymizer; - - public Map pseudonymize(Map r) { - return MapTraverser.traverse(r, fieldPseudonymizer::pseudonymize); - } - - public Map depseudonymize(Map r) { - return MapTraverser.traverse(r, fieldPseudonymizer::depseudonymize); - } -} diff --git a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapSerializerFactory.java b/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapSerializerFactory.java index 8468fb5..dc3d78c 100644 --- a/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapSerializerFactory.java +++ b/src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapSerializerFactory.java @@ -1,10 +1,13 @@ package no.ssb.dlp.pseudo.core.map; import io.micronaut.http.MediaType; +import io.reactivex.Flowable; import no.ssb.dlp.pseudo.core.csv.CsvRecordMapSerializer; import no.ssb.dlp.pseudo.core.file.MoreMediaTypes; import no.ssb.dlp.pseudo.core.json.JsonRecordMapSerializer; +import java.util.Map; + public class RecordMapSerializerFactory { private RecordMapSerializerFactory() {} @@ -21,4 +24,19 @@ public static RecordMapSerializer newFromMediaType(MediaType mediaType) } } + public static RecordMapSerializer emptySerializer() { + return new RecordMapSerializer<>() { + @Override + public String serialize(Map r, int position) { + return ""; + } + + @Override + public Flowable serialize(Flowable> recordStream) { + return Flowable.empty(); + } + }; + } + + } diff --git a/src/test/java/no/ssb/dlp/pseudo/core/func/MapFuncTest.java b/src/test/java/no/ssb/dlp/pseudo/core/func/MapFuncTest.java index 53cdedf..994a16d 100644 --- a/src/test/java/no/ssb/dlp/pseudo/core/func/MapFuncTest.java +++ b/src/test/java/no/ssb/dlp/pseudo/core/func/MapFuncTest.java @@ -33,6 +33,6 @@ void mapFuncWithTimestamp() { verify(mockMapper).setConfig(argumentsCaptured.capture()); assert argumentsCaptured.getValue().containsKey("snapshotDate"); // Check that the init method was called - verify(mockMapper).init(eq("50607080901")); + verify(mockMapper).init(eq(PseudoFuncInput.of("50607080901"))); } } diff --git a/src/test/java/no/ssb/dlp/pseudo/core/func/PseudoFuncConfigPresetTest.java b/src/test/java/no/ssb/dlp/pseudo/core/func/PseudoFuncConfigPresetTest.java index d6c1a5e..3db2465 100644 --- a/src/test/java/no/ssb/dlp/pseudo/core/func/PseudoFuncConfigPresetTest.java +++ b/src/test/java/no/ssb/dlp/pseudo/core/func/PseudoFuncConfigPresetTest.java @@ -22,6 +22,11 @@ public DummyPseudoFunc(PseudoFuncConfig genericConfig) { super(genericConfig.getFuncDecl()); } + @Override + public String getAlgorithm() { + return null; + } + @Override public PseudoFuncOutput apply(PseudoFuncInput input) { return new PseudoFuncOutput();