Skip to content

Commit

Permalink
Use hashing rather than RNG for VID sampling of synthetic events. (#1463
Browse files Browse the repository at this point in the history
)

This allows implementations to be deterministic and language-agnostic.
  • Loading branch information
SanjayVas authored Feb 7, 2024
1 parent 02c447e commit a22d079
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ kt_jvm_library(
deps = [
":labeled_event",
"//src/main/proto/wfa/measurement/api/v2alpha/event_group_metadata/testing:simulator_synthetic_data_spec_kt_jvm_proto",
"@wfa_common_jvm//imports/java/com/google/common:guava",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.wfanet.measurement.loadtest.dataprovider

import com.google.common.hash.Hashing
import com.google.protobuf.Descriptors.FieldDescriptor
import com.google.protobuf.Message
import com.google.protobuf.kotlin.toByteStringUtf8
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.time.ZoneOffset
import java.util.Random
import kotlin.math.max
import kotlin.math.min
import kotlin.random.Random as KotlinRandom
import kotlin.random.asKotlinRandom
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.FieldValue
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.SimulatorSyntheticDataSpec
import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.SyntheticEventGroupSpec
Expand All @@ -36,11 +37,14 @@ import org.wfanet.measurement.common.rangeTo
import org.wfanet.measurement.common.toLocalDate

object SyntheticDataGeneration {
private val VID_SAMPLING_FINGERPRINT_FUNCTION = Hashing.farmHashFingerprint64()
private const val FINGERPRINT_BUFFER_SIZE_BYTES = 512

/**
* Generates a sequence of [EventQuery.LabeledEvent].
* Generates a sequence of [LabeledEvent].
*
* Consumption of [Sequence] throws
* * [IllegalArgumentException] when [SimulatorSyntheticDataSpec] is invalid, or incompatible
* * [IllegalStateException] when [SimulatorSyntheticDataSpec] is invalid, or incompatible
* * with [T].
*
* @param messageInstance an instance of the event message type [T]
Expand All @@ -52,28 +56,6 @@ object SyntheticDataGeneration {
populationSpec: SyntheticPopulationSpec,
syntheticEventGroupSpec: SyntheticEventGroupSpec,
): Sequence<LabeledEvent<T>> {
var samplingRequired = false
val vidRangeSpecs =
syntheticEventGroupSpec.dateSpecsList
.flatMap { it.frequencySpecsList }
.flatMap { it.vidRangeSpecsList }

for (vidRangeSpec in vidRangeSpecs) {
val vidRangeWidth = vidRangeSpec.vidRange.endExclusive - vidRangeSpec.vidRange.start
check(vidRangeWidth >= vidRangeSpec.sampleSize) {
"all vidRange widths should be larger than sampleSizes"
}
if (vidRangeSpec.sampleSize > 0) {
samplingRequired = true
}
}

if (samplingRequired) {
check(syntheticEventGroupSpec.rngType == SyntheticEventGroupSpec.RngType.JAVA_UTIL_RANDOM) {
"Expecting JAVA_UTIL_RANDOM rng type, got ${syntheticEventGroupSpec.rngType}"
}
}

val subPopulations = populationSpec.subPopulationsList

return sequence {
Expand All @@ -84,34 +66,46 @@ object SyntheticDataGeneration {
check(!frequencySpec.hasOverlaps()) { "The VID ranges should be non-overlapping." }

for (vidRangeSpec: VidRangeSpec in frequencySpec.vidRangeSpecsList) {
val random = Random(vidRangeSpec.randomSeed).asKotlinRandom()
val subPopulation: SubPopulation =
vidRangeSpec.vidRange.findSubPopulation(subPopulations)
?: throw IllegalArgumentException()
?: error("Sub-population not found")
check(vidRangeSpec.samplingRate in 0.0..1.0) { "Invalid sampling_rate" }
if (vidRangeSpec.sampled) {
check(syntheticEventGroupSpec.samplingNonce != 0L) {
"sampling_nonce is required for VID sampling"
}
}

val builder: Message.Builder = messageInstance.newBuilderForType()

populationSpec.populationFieldsList.forEach {
val subPopulationFieldValue: FieldValue =
subPopulation.populationFieldsValuesMap.getValue(it)
val fieldPath = it.split('.')
builder.setField(fieldPath, subPopulationFieldValue)
try {
builder.setField(fieldPath, subPopulationFieldValue)
} catch (e: IllegalArgumentException) {
throw IllegalStateException(e)
}
}

populationSpec.nonPopulationFieldsList.forEach {
val nonPopulationFieldValue: FieldValue =
vidRangeSpec.nonPopulationFieldValuesMap.getValue(it)
val fieldPath = it.split('.')
builder.setField(fieldPath, nonPopulationFieldValue)
try {
builder.setField(fieldPath, nonPopulationFieldValue)
} catch (e: IllegalArgumentException) {
throw IllegalStateException(e)
}
}

@Suppress("UNCHECKED_CAST") // Safe per protobuf API.
val message = builder.build() as T

for (date in dateProgression) {
for (i in 0 until frequencySpec.frequency) {
val sampledVids = sampleVids(vidRangeSpec, random)
for (vid in sampledVids) {
for (i in 1..frequencySpec.frequency) {
for (vid in vidRangeSpec.sampledVids(syntheticEventGroupSpec.samplingNonce)) {
yield(LabeledEvent(date.atStartOfDay().toInstant(ZoneOffset.UTC), vid, message))
}
}
Expand All @@ -122,19 +116,35 @@ object SyntheticDataGeneration {
}
}

/**
* Returns the sampled Vids from [vidRangeSpec]. Given the same [vidRangeSpec] and [randomSeed],
* returns the same vids. Returns all of the vids if sample size is 0.
*/
private fun sampleVids(vidRangeSpec: VidRangeSpec, random: KotlinRandom): Sequence<Long> {
val vidRangeSequence =
(vidRangeSpec.vidRange.start until vidRangeSpec.vidRange.endExclusive).asSequence()
if (vidRangeSpec.sampleSize == 0) {
return vidRangeSequence
/** Returns the VIDs which are in the sample for this [VidRangeSpec]. */
private fun VidRangeSpec.sampledVids(samplingNonce: Long): Sequence<Long> {
return (vidRange.start until vidRange.endExclusive).asSequence().filter {
inSample(it, samplingNonce)
}
return vidRangeSequence.shuffled(random).take(vidRangeSpec.sampleSize)
}

/** Returns whether [vid] is in the sample specified by this [VidRangeSpec]. */
private fun VidRangeSpec.inSample(vid: Long, samplingNonce: Long): Boolean {
if (!sampled) {
return true
}

val buffer =
ByteBuffer.allocate(FINGERPRINT_BUFFER_SIZE_BYTES)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(vid)
.putLong(samplingNonce)
.putFieldValueMap(nonPopulationFieldValuesMap)
.flip()
val fingerprint = VID_SAMPLING_FINGERPRINT_FUNCTION.hashBytes(buffer).asLong()
val rangeValue: Double = fingerprint.toDouble() / Long.MAX_VALUE
return rangeValue in -samplingRate..samplingRate
}

/** Whether the [VidRange] in this [VidRangeSpec] should be sampled. */
private val VidRangeSpec.sampled: Boolean
get() = samplingRate > 0.0 && samplingRate < 1.0

/**
* Returns the [SubPopulation] from a list of [SubPopulation] that contains the [VidRange] in its
* range.
Expand Down Expand Up @@ -211,3 +221,34 @@ private fun SyntheticEventGroupSpec.FrequencySpec.hasOverlaps() =

private fun VidRange.overlaps(other: VidRange) =
max(start, other.start) < min(endExclusive, other.endExclusive)

private fun ByteBuffer.putFieldValueMap(fieldValueMap: Map<String, FieldValue>): ByteBuffer {
for ((key, value) in fieldValueMap.entries.sortedBy { it.key }) {
putStringUtf8(key)
@Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") // Protobuf enum accessors cannot return null.
when (value.valueCase) {
FieldValue.ValueCase.STRING_VALUE -> putStringUtf8(value.stringValue)
FieldValue.ValueCase.BOOL_VALUE -> putBoolean(value.boolValue)
FieldValue.ValueCase.DOUBLE_VALUE -> putDouble(value.doubleValue)
FieldValue.ValueCase.FLOAT_VALUE -> putFloat(value.floatValue)
FieldValue.ValueCase.ENUM_VALUE,
FieldValue.ValueCase.INT32_VALUE -> putInt(value.enumValue)
FieldValue.ValueCase.INT64_VALUE -> putLong(value.int64Value)
FieldValue.ValueCase.DURATION_VALUE ->
putLong(value.durationValue.seconds).putInt(value.durationValue.nanos)
FieldValue.ValueCase.TIMESTAMP_VALUE ->
putLong(value.timestampValue.seconds).putInt(value.timestampValue.nanos)
FieldValue.ValueCase.VALUE_NOT_SET -> throw IllegalArgumentException("value not set")
}
}
return this // For chaining.
}

private fun ByteBuffer.putBoolean(value: Boolean): ByteBuffer {
val byte: Byte = if (value) 1 else 0
return put(byte)
}

private fun ByteBuffer.putStringUtf8(value: String): ByteBuffer {
return put(value.toByteStringUtf8().asReadOnlyByteBuffer())
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,26 @@ message SyntheticEventGroupSpec {
// their values.
map<string, FieldValue> non_population_field_values = 2;

// Number of vids sampled uniformly without replacement from vid_range.
// If this is 0, no sampling is done and all the vids in range are taken.
int32 sample_size = 3;

// Random seed to be fed into the random number generator to sample vids.
// Required if this VidRangeSpec specifies a sample_size.
int64 random_seed = 4;
// Rate in the range (0, 1] at which VIDs sampled from `vid_range`.
//
// If not specified or if the rate is 1, no sampling is done and all VIDs
// in range are taken.
//
// The sampling process is as follows, with all values using little endian
// byte order:
// 1. Concatenate the VID, `sampling_nonce`, and
// `non_population_field_values`.
// 2. Take the FarmHash Fingerprint64 of (1) as a signed 64-bit integer.
// 3. Divide (2) by 2^63 - 1 to map it to the range [-1, 1].
// 4. Check if (4) is in the range [-`sampling_rate`, `sampling_rate`]. If
// so, the VID is included in the sample.
//
// `non_population_field_values` is encoded by concatenating the key and
// the value of each entry ordered by key lexicographically. The values
// are encoded by concatenating the scalar subfields in depth-first search
// order. Fields of type `string` are encoded as UTF-8, and fields of type
// `boolean` are encoded as a single byte containing `0` or `1`.
double sampling_rate = 3;
}
// The VID ranges should be non-overlapping sub-ranges of SubPopulations.
repeated VidRangeSpec vid_range_specs = 2;
Expand All @@ -139,15 +152,7 @@ message SyntheticEventGroupSpec {
// `DateSpec`s should describe non-overlapping date ranges.
repeated DateSpec date_specs = 2;

// Type of random number generator to sample vids.
enum RngType {
// Default value used if the rng type is omitted.
RNG_TYPE_UNSPECIFIED = 0;
// Signals java.util.Random should be used for sampling.
JAVA_UTIL_RANDOM = 1;
}

// Random Number Generator type for this `SyntheticEventGroupSpec`.
// Required if any VidRangeSpec specifies a sample_size.
RngType rng_type = 4;
// Random nonce value used for sampling. Required if `sampling_rate` is
// specified in any `VidRangeSpec`.
int64 sampling_nonce = 3;
}
Loading

0 comments on commit a22d079

Please sign in to comment.