Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15215: KIP-954, fully customizable default DSL store type #1

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.streams.processor.TimestampExtractor;

import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DSLStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
Expand Down Expand Up @@ -216,6 +217,11 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
}
}

public DSLStoreProvider storeProvider() {
// TODO(KIP-954): get this from StreamsConfig (or TopologyConfig overrides)
return null;
}

public Materialized.StoreType parseStoreType() {
if (storeType.equals(IN_MEMORY)) {
return Materialized.StoreType.IN_MEMORY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DSLStoreProvider;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class Materialized<K, V, S extends StateStore> {
protected Map<String, String> topicConfig = new HashMap<>();
protected Duration retention;
public StoreType storeType;
public DSLStoreProvider storeProvider;

// the built-in state store types
public enum StoreType {
Expand Down Expand Up @@ -107,7 +109,7 @@ protected Materialized(final Materialized<K, V, S> materialized) {
* @param <K> key type of the store
* @param <V> value type of the store
* @param <S> type of the {@link StateStore}
* @return a new {@link Materialized} instance with the given storeName
* @return a new {@link Materialized} instance with the given storeType
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType) {
Objects.requireNonNull(storeType, "store type can't be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -46,15 +47,19 @@ public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore<
public StoreBuilder<?> materialize() {
KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryKeyValueStore(materialized.storeName());
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName());
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().timestampedKeyValueStore(materialized.storeName());
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryKeyValueStore(materialized.storeName());
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName());
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.DSLStoreProvider;
import org.apache.kafka.streams.state.StoreSupplier;

import java.time.Duration;
Expand Down Expand Up @@ -48,6 +49,7 @@ public MaterializedInternal(final Materialized<K, V, S> materialized,
// if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider
// otherwise, set to default rocksDB
if (storeType == null) {
// TODO(KIP-954): get DSLStoreProvider from TopologyConfigs here, if set
storeType = StoreType.ROCKS_DB;
if (nameProvider instanceof InternalStreamsBuilder) {
final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs();
Expand All @@ -73,6 +75,10 @@ public StoreType storeType() {
return storeType;
}

public DSLStoreProvider storeProvider() {
return storeProvider;
}

public StoreSupplier<S> storeSupplier() {
return storeSupplier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,27 @@ private StoreBuilder<SessionStore<K, V>> materialize(final MaterializedInternal
+ " retention=[" + retentionPeriod + "]");
}

switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemorySessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
case ROCKS_DB:
supplier = Stores.persistentSessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().sessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod));
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemorySessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
case ROCKS_DB:
supplier = Stores.persistentSessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,27 +253,34 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
+ " grace=[" + windows.gracePeriodMs() + "],"
+ " retention=[" + retentionPeriod + "]");
}

switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemorySessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
case ROCKS_DB:
supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
new RocksDbTimeOrderedSessionBytesStoreSupplier(
materialized.storeName(),
retentionPeriod,
true) :
Stores.persistentSessionStore(
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().sessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod));
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemorySessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra see this for example

);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
break;
case ROCKS_DB:
// TODO(KIP-954): should the DSLStoreProvider include an API for this or is
// it a specific optimization that only applies to rocksdb?
supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
new RocksDbTimeOrderedSessionBytesStoreSupplier(
materialized.storeName(),
retentionPeriod,
true) :
Stores.persistentSessionStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod)
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,26 +113,33 @@ private StoreBuilder<TimestampedWindowStore<K, V>> materialize(final Materialize
+ " retention=[" + retentionPeriod
+ "]");
}

switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().timestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false);
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,34 +219,43 @@ private <VR> StoreBuilder<TimestampedWindowStore<K, VR>> materialize(final Mater
+ " grace=[" + windows.gracePeriodMs() + "],"
+ " retention=[" + retentionPeriod + "]");
}

switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
case ROCKS_DB:
supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ?
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false,
true
) :
Stores.persistentTimestampedWindowStore(
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().timestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false);
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
break;
case ROCKS_DB:
// TODO(KIP-954): should the DSLStoreProvider include an API for this or is
// it a specific optimization that only applies to rocksdb?
supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ?
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false,
true
) :
Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.timeDifferenceMs()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,33 @@ private StoreBuilder<TimestampedWindowStore<K, V>> materialize(
+ " retention=[" + retentionPeriod
+ "]");
}

switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
if (materialized.storeProvider() != null) {
supplier = materialized.storeProvider().timestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false);
} else {
switch (materialized.storeType()) {
case IN_MEMORY:
supplier = Stores.inMemoryWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
case ROCKS_DB:
supplier = Stores.persistentTimestampedWindowStore(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
Duration.ofMillis(windows.size()),
false
);
break;
default:
throw new IllegalStateException("Unknown store type: " + materialized.storeType());
}
}
}

Expand Down
Loading