Skip to content

Commit

Permalink
Update pseudo funcs and remove unnecessary abstractions (#32)
Browse files Browse the repository at this point in the history
* Remove RecordMapPseudonymizer since pseudonymization can be done using PseudoFuncs directly. Simplified StreamProcessor.
  • Loading branch information
bjornandre authored Feb 6, 2024
1 parent fbc5ec2 commit d1097b5
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 220 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<github.repository>statisticsnorway/dapla-dlp-pseudo-core</github.repository>

<!-- Dependency versions -->
<dapla-dlp-pseudo-func.version>1.2.8</dapla-dlp-pseudo-func.version>
<dapla-dlp-pseudo-func.version>1.3.0-SNAPSHOT</dapla-dlp-pseudo-func.version>
<guava.version>32.0.0-jre</guava.version>
<jsonassert.version>1.5.1</jsonassert.version>
<logback.version>1.4.6</logback.version>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Map;

public interface StreamProcessor {
<T> Completable init(InputStream is, RecordMapSerializer<T> serializer);
<T> Completable init(InputStream is);
<T> Flowable<T> process(InputStream is, RecordMapSerializer<T> serializer);
@FunctionalInterface
public interface ItemProcessor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import no.ssb.dlp.pseudo.core.StreamProcessor;
import no.ssb.dlp.pseudo.core.map.RecordMapProcessor;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializer;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializerFactory;

import java.io.InputStream;
import java.util.LinkedHashMap;
Expand All @@ -22,20 +23,20 @@
@Slf4j
public class CsvStreamProcessor implements StreamProcessor {

private final RecordMapProcessor recordMapProcessor;
private final RecordMapProcessor<String> recordMapProcessor;

@Override
public <T> Completable init(InputStream is, RecordMapSerializer<T> serializer) {
public <T> Completable init(InputStream is) {
if (recordMapProcessor.hasPreprocessors()) {
return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map)));
return Completable.fromPublisher(processStream(is, RecordMapSerializerFactory.emptySerializer(), recordMapProcessor::init));
} else {
return Completable.complete();
}
}

@Override
public <T> Flowable<T> process(InputStream is, RecordMapSerializer<T> serializer) {
return processStream(is, serializer, (map) -> recordMapProcessor.process(map));
return processStream(is, serializer, recordMapProcessor::process);
}

<T> CsvProcessorContext<T> initCsvProcessorContext(InputStream is, RecordMapSerializer<T> serializer) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static no.ssb.dlp.pseudo.core.PseudoOperation.DEPSEUDONYMIZE;
import static no.ssb.dlp.pseudo.core.PseudoOperation.PSEUDONYMIZE;

@Deprecated(forRemoval = true) // Should use PseudoFuncs instead
public class FieldPseudonymizer {

private final PseudoFuncs pseudoFuncs;
Expand All @@ -25,43 +26,41 @@ private FieldPseudonymizer(PseudoFuncs pseudoFuncs) {
this.pseudoFuncs = pseudoFuncs;
}

public String pseudonymize(FieldDescriptor field, String varValue) {
public PseudoFuncOutput pseudonymize(FieldDescriptor field, String varValue) {
return process(PSEUDONYMIZE, field, varValue);
}

public String depseudonymize(FieldDescriptor field, String varValue) {
public PseudoFuncOutput depseudonymize(FieldDescriptor field, String varValue) {
return process(DEPSEUDONYMIZE, field, varValue);
}

public Optional<PseudoFuncRuleMatch> match(FieldDescriptor field) {
return pseudoFuncs.findPseudoFunc(field);
}

public String init(FieldDescriptor field, String varValue) {
public void init(FieldDescriptor field, String varValue) {
Optional<PseudoFuncRuleMatch> match = pseudoFuncs.findPseudoFunc(field);
if (match.isPresent()) {
match.get().getFunc().init(PseudoFuncInput.of(varValue));
}
return varValue;
}

private String process(PseudoOperation operation, FieldDescriptor field, String varValue) {
private PseudoFuncOutput process(PseudoOperation operation, FieldDescriptor field, String varValue) {

// TODO: This check is function type specific (e.g. only applies for FPE?)
if (varValue == null || varValue.length() <= 2) {
return varValue;
return PseudoFuncOutput.of(varValue);
}

PseudoFuncRuleMatch match = pseudoFuncs.findPseudoFunc(field).orElse(null);
try {
if (match == null) {
return varValue;
return PseudoFuncOutput.of(varValue);
}

PseudoFuncOutput res = (operation == PSEUDONYMIZE)
return (operation == PSEUDONYMIZE)
? match.getFunc().apply(PseudoFuncInput.of(varValue))
: match.getFunc().restore(PseudoFuncInput.of(varValue));
return (String) res.getFirstValue();
}
catch (Exception e) {
throw new PseudoException(operation + " error - field='" + field.getPath() + "', originalValue='" + varValue + "'", e);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/no/ssb/dlp/pseudo/core/func/PseudoFuncs.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ public class PseudoFuncs {

private final Map<PseudoFuncRule, PseudoFunc> ruleToFuncMap = new LinkedHashMap<>();


//TODO: Validate that all required secrets are available
public PseudoFuncs(Collection<PseudoFuncRule> rules, Collection<PseudoSecret> pseudoSecrets, Collection<PseudoKeyset> keysets) {
public PseudoFuncs(Collection<PseudoFuncRule> rules, Collection<PseudoSecret> pseudoSecrets,
Collection<PseudoKeyset> keysets) {
Map<PseudoFuncRule, PseudoFuncConfig> ruleToPseudoFuncConfigs = initPseudoFuncConfigs(rules, pseudoSecrets, keysets);
rules.forEach(rule -> ruleToFuncMap.put(rule, PseudoFuncFactory.create(ruleToPseudoFuncConfigs.get(rule))));
}

// TODO: Move these init functions elsewhere?
static Map<PseudoFuncRule, PseudoFuncConfig> initPseudoFuncConfigs(Collection<PseudoFuncRule> pseudoRules, Collection<PseudoSecret> pseudoSecrets, Collection<PseudoKeyset> pseudoKeysets) {
static Map<PseudoFuncRule, PseudoFuncConfig> initPseudoFuncConfigs(Collection<PseudoFuncRule> pseudoRules,
Collection<PseudoSecret> pseudoSecrets,
Collection<PseudoKeyset> pseudoKeysets) {

Map<String, PseudoSecret> pseudoSecretsMap = pseudoSecrets.stream().collect(
Collectors.toMap(PseudoSecret::getName, Function.identity()));
Expand Down Expand Up @@ -167,5 +171,4 @@ public PseudoFuncConfigException(String message, Exception e) {
super(message, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import no.ssb.dlp.pseudo.core.map.RecordMap;
import no.ssb.dlp.pseudo.core.map.RecordMapProcessor;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializer;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializerFactory;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -36,9 +37,9 @@ public class JsonStreamProcessor implements StreamProcessor {
private final RecordMapProcessor recordMapProcessor;

@Override
public <T> Completable init(InputStream is, RecordMapSerializer<T> serializer) {
public <T> Completable init(InputStream is) {
if (recordMapProcessor.hasPreprocessors()) {
return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map)));
return Completable.fromPublisher(processStream(is, RecordMapSerializerFactory.emptySerializer(), recordMapProcessor::init));
} else {
return Completable.complete();
}
Expand Down

This file was deleted.

12 changes: 10 additions & 2 deletions src/main/java/no/ssb/dlp/pseudo/core/map/RecordMapProcessor.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package no.ssb.dlp.pseudo.core.map;

import io.reactivex.processors.FlowableProcessor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import no.ssb.dlp.pseudo.core.field.ValueInterceptorChain;

import java.util.Map;

@RequiredArgsConstructor
public class RecordMapProcessor {
public class RecordMapProcessor<T> {
private final ValueInterceptorChain valueInterceptorChain;

@Getter
private final MetadataProcessor<T> metadataProcessor;
@FunctionalInterface
public interface MetadataProcessor<T> {
// The MetadataProcessor is used to publish/subscribe to events related to the processing of each RecordMap
FlowableProcessor<T> toFlowableProcessor();
}
public Map<String, Object> init(Map<String, Object> r) {
return MapTraverser.traverse(r, valueInterceptorChain::init);
}
Expand Down

This file was deleted.

Loading

0 comments on commit d1097b5

Please sign in to comment.