Skip to content

Commit

Permalink
feat: Implement latest_by_offset() UDAF (#4782)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Mar 17, 2020
1 parent a54f4d8 commit 0c13bb0
Show file tree
Hide file tree
Showing 11 changed files with 929 additions and 1 deletion.
11 changes: 11 additions & 0 deletions docs-md/developer-guide/ksqldb-reference/aggregate-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ first considering all the records from the first window, then the
late-arriving record, then the records from the second window in
the order they were originally processed.

LATEST_BY_OFFSET
----------------

`LATEST_BY_OFFSET(col1)`

Stream

Return the latest value for a given column. Latest here is defined as the value in the partition
with the greatest offset.
Note: rows where `col1` is null will be ignored.

MAX
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class UdafLoader {

void loadUdafFromClass(final Class<?> theClass, final String path) {
final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class);

final List<UdafFactoryInvoker> invokers = new ArrayList<>();
for (final Method method : theClass.getMethods()) {
if (method.getAnnotation(UdafFactory.class) != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.latest;

import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
import io.confluent.ksql.function.udaf.UdafFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

@UdafDescription(
name = "LATEST_BY_OFFSET",
description = LatestByOffset.DESCRIPTION
)
public final class LatestByOffset {

static final String DESCRIPTION =
"This function returns the most recent value for the column, computed by offset.";

private LatestByOffset() {
}

static final String SEQ_FIELD = "SEQ";
static final String VAL_FIELD = "VAL";

public static final Schema STRUCT_INTEGER = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_INT32_SCHEMA)
.build();

public static final Schema STRUCT_LONG = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.build();

public static final Schema STRUCT_DOUBLE = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();

public static final Schema STRUCT_BOOLEAN = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();

public static final Schema STRUCT_STRING = SchemaBuilder.struct().optional()
.field(SEQ_FIELD, Schema.OPTIONAL_INT64_SCHEMA)
.field(VAL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.build();

static AtomicLong sequence = new AtomicLong();

@UdafFactory(description = "return the latest value of an integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL INT>")
public static Udaf<Integer, Struct, Integer> latestInteger() {
return latest(STRUCT_INTEGER);
}

@UdafFactory(description = "return the latest value of an big integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BIGINT>")
public static Udaf<Long, Struct, Long> latestLong() {
return latest(STRUCT_LONG);
}

@UdafFactory(description = "return the latest value of a double column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DOUBLE>")
public static Udaf<Double, Struct, Double> latestDouble() {
return latest(STRUCT_DOUBLE);
}

@UdafFactory(description = "return the latest value of a boolean column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BOOLEAN>")
public static Udaf<Boolean, Struct, Boolean> latestBoolean() {
return latest(STRUCT_BOOLEAN);
}

@UdafFactory(description = "return the latest value of a string column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL STRING>")
public static Udaf<String, Struct, String> latestString() {
return latest(STRUCT_STRING);
}

static <T> Struct createStruct(final Schema schema, final T val) {
final Struct struct = new Struct(schema);
struct.put(SEQ_FIELD, generateSequence());
struct.put(VAL_FIELD, val);
return struct;
}

private static long generateSequence() {
return sequence.getAndIncrement();
}

private static int compareStructs(final Struct struct1, final Struct struct2) {
// Deal with overflow - we assume if one is positive and the other negative then the sequence
// has overflowed - in which case the latest is the one with the smallest sequence
final long sequence1 = struct1.getInt64(SEQ_FIELD);
final long sequence2 = struct2.getInt64(SEQ_FIELD);
if (sequence1 < 0 && sequence2 >= 0) {
return 1;
} else if (sequence2 < 0 && sequence1 >= 0) {
return -1;
} else {
return Long.compare(sequence1, sequence2);
}
}

@UdafFactory(description = "Latest by offset")
static <T> Udaf<T, Struct, T> latest(final Schema structSchema) {
return new Udaf<T, Struct, T>() {

@Override
public Struct initialize() {
return null;
}

@Override
public Struct aggregate(final T current, final Struct aggregate) {
if (current == null) {
return aggregate;
} else {
return createStruct(structSchema, current);
}
}

@Override
public Struct merge(final Struct aggOne, final Struct aggTwo) {
// When merging we need some way of evaluating the "latest' one.
// We do this by keeping track of the sequence of when it was originally processed
if (compareStructs(aggOne, aggTwo) >= 0) {
return aggOne;
} else {
return aggTwo;
}
}

@Override
@SuppressWarnings("unchecked")
public T map(final Struct agg) {
return (T) agg.get(VAL_FIELD);
}
};
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udaf.latest;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import io.confluent.ksql.function.udaf.Udaf;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

public class LatestByOffsetUdafTest {

@Test
public void shouldInitializeToNull() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = LatestByOffset
.latest(LatestByOffset.STRUCT_LONG);

// When:
Struct init = udaf.initialize();

// Then:
assertThat(init, is(nullValue()));
}

@Test
public void shouldComputeLatestInteger() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = LatestByOffset.latestInteger();

// When:
Struct res = udaf
.aggregate(123, LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321));

// Then:
assertThat(res.get(LatestByOffset.VAL_FIELD), is(123));
}

@Test
public void shouldMerge() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = LatestByOffset.latestInteger();

Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123);
Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321);

// When:
Struct merged1 = udaf.merge(agg1, agg2);
Struct merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(merged1, is(agg2));
assertThat(merged2, is(agg2));
}

@Test
public void shouldMergeWithOverflow() {
// Given:
final Udaf<Integer, Struct, Integer> udaf = LatestByOffset.latestInteger();

LatestByOffset.sequence.set(Long.MAX_VALUE);

Struct agg1 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 123);
Struct agg2 = LatestByOffset.createStruct(LatestByOffset.STRUCT_INTEGER, 321);

// When:
Struct merged1 = udaf.merge(agg1, agg2);
Struct merged2 = udaf.merge(agg2, agg1);

// Then:
assertThat(agg1.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MAX_VALUE));
assertThat(agg2.getInt64(LatestByOffset.SEQ_FIELD), is(Long.MIN_VALUE));
assertThat(merged1, is(agg2));
assertThat(merged2, is(agg2));
}


@Test
public void shouldComputeLatestLong() {
// Given:
final Udaf<Long, Struct, Long> udaf = LatestByOffset.latestLong();

// When:
Struct res = udaf
.aggregate(123L, LatestByOffset.createStruct(LatestByOffset.STRUCT_LONG, 321L));

// Then:
assertThat(res.getInt64(LatestByOffset.VAL_FIELD), is(123L));
}

@Test
public void shouldComputeLatestDouble() {
// Given:
final Udaf<Double, Struct, Double> udaf = LatestByOffset.latestDouble();

// When:
Struct res = udaf
.aggregate(1.1d, LatestByOffset.createStruct(LatestByOffset.STRUCT_DOUBLE, 2.2d));

// Then:
assertThat(res.getFloat64(LatestByOffset.VAL_FIELD), is(1.1d));
}

@Test
public void shouldComputeLatestBoolean() {
// Given:
final Udaf<Boolean, Struct, Boolean> udaf = LatestByOffset.latestBoolean();

// When:
Struct res = udaf
.aggregate(true, LatestByOffset.createStruct(LatestByOffset.STRUCT_BOOLEAN, false));

// Then:
assertThat(res.getBoolean(LatestByOffset.VAL_FIELD), is(true));
}

@Test
public void shouldComputeLatestString() {
// Given:
final Udaf<String, Struct, String> udaf = LatestByOffset.latestString();

// When:
Struct res = udaf
.aggregate("foo", LatestByOffset.createStruct(LatestByOffset.STRUCT_STRING, "bar"));

// Then:
assertThat(res.getString(LatestByOffset.VAL_FIELD), is("foo"));
}

}
Loading

0 comments on commit 0c13bb0

Please sign in to comment.