Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building #4

Closed
wants to merge 114 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
294556d
move smc to coordinator
findingrish Aug 27, 2023
d1c0dca
refactor CoordinatorServerView
findingrish Aug 28, 2023
a48d2fe
minor change
findingrish Aug 28, 2023
5e7756f
Revert "refactor CoordinatorServerView"
findingrish Aug 28, 2023
a9ff640
Draft changes for the coordinator to conditionally build smc, and ref…
findingrish Sep 4, 2023
105bda9
Move schema querying logic to BrokerSegmentMetadataCache
findingrish Sep 6, 2023
3aad095
Fix dataSource schema on coordinator and minor renaming
findingrish Sep 6, 2023
14baf19
cleanup and fix some tests
findingrish Sep 7, 2023
857f056
Port tests and test build failure
findingrish Sep 7, 2023
95389b1
Fix unit tests and add test for getAllUsedSegments
findingrish Sep 8, 2023
fb72888
Merge remote-tracking branch 'origin/master' into coordinator_builds_…
findingrish Sep 8, 2023
bc8396c
minor change
findingrish Sep 8, 2023
fbab4c8
Remove logic to refactor sys segments table building logic
findingrish Sep 8, 2023
151b0b1
undo changes in SegmentsMetadataManager
findingrish Sep 9, 2023
8dbea5b
Minor code changes and add multiple tests
findingrish Sep 9, 2023
e630b9b
Add test for QueryableCoordinatorServerViewTest
findingrish Sep 11, 2023
17c3514
Add test for BrokerSegmentMetadataCache
findingrish Sep 11, 2023
f0baf33
minor code changes and fix checkstyle issues
findingrish Sep 11, 2023
f18c060
Fix intellij inspections
findingrish Sep 11, 2023
03383e6
Fix QueryableCoordinatorServerView test
findingrish Sep 11, 2023
dc6aa6e
Merge remote-tracking branch 'origin/master' into coordinator_builds_…
findingrish Sep 11, 2023
e5c4b39
Complete tests for SMC to verify DataSourceInformation
findingrish Sep 11, 2023
45378d5
Add comments
findingrish Sep 11, 2023
2327b12
Refactor SegmentMetadataCacheTest and BrokerSegmentMetadataCacheTest
findingrish Sep 11, 2023
d4ece6a
Test fetching ds schema from coordinator in BrokerSegmentMetadataCach…
findingrish Sep 12, 2023
eb1771f
fix checkstyle issue
findingrish Sep 12, 2023
1440dac
Add test for QueryableCoordinatorServerView
findingrish Sep 12, 2023
10068b6
Fix SegmentStatusInClusterTest
findingrish Sep 12, 2023
032734a
Address intellij inspection
findingrish Sep 12, 2023
9a04173
Merge remote-tracking branch 'origin/master' into coordinator_builds_…
findingrish Sep 12, 2023
80f4424
Add undeclared dependency in server module
findingrish Sep 12, 2023
33a8dd5
Remove enabled field from SegmentMetadataCacheConfig
findingrish Sep 12, 2023
7a7ca55
Add class to manage druid table information in SegmentMetadataCache, …
findingrish Sep 13, 2023
eb6a145
Merge remote-tracking branch 'origin/master' into coordinator_builds_…
findingrish Sep 13, 2023
b9fb83d
Minor refactoring in SegmentMetadataCache
findingrish Sep 13, 2023
aa2bfe7
Make SegmentMetadataCache generic
findingrish Sep 13, 2023
e97dcda
Add a generic abstract class for segment metadata cache
findingrish Sep 13, 2023
7badce1
Rename SegmentMetadataCache to CoordinatorSegmentMetadataCache
findingrish Sep 13, 2023
25cdce6
Rename PhysicalDataSourceMetadataBuilder to PhysicalDataSourceMetadat…
findingrish Sep 13, 2023
5f5ad18
Fix json property key name in DataSourceInformation
findingrish Sep 13, 2023
08e949e
Add validation in MetadataResource#getAllUsedSegments, update javadocs
findingrish Sep 14, 2023
80fc09d
Minor changes
findingrish Sep 14, 2023
4217cd8
Minor change
findingrish Sep 14, 2023
8b7e483
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 14, 2023
d6ac350
Update base property name for query config classes in Coordinator
findingrish Sep 14, 2023
533236b
Log ds schema change when polling from coordinator
findingrish Sep 15, 2023
70f0888
update the logic to determine is_active status in segments table for …
findingrish Sep 15, 2023
a176bfe
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 15, 2023
b32dfd6
Update the logic to set numRows in the sys segments table, add comments
findingrish Sep 15, 2023
17417b5
Rename config druid.coordinator.segmentMetadataCache.enabled to druid…
findingrish Sep 15, 2023
6a395a9
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 18, 2023
907ace3
Report cache init time irrespective of the awaitInitializationOnStart…
findingrish Sep 20, 2023
cf68c38
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 20, 2023
441f37a
Report metric for fetching schema from coordinator
findingrish Sep 20, 2023
bd5b048
Add auth check in api to return dataSourceInformation, report metrics…
findingrish Sep 21, 2023
933d8d1
Fix bug in Coordinator api to return dataSourceInformation
findingrish Sep 21, 2023
9e7e364
Minor change
findingrish Sep 21, 2023
e7356ce
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 22, 2023
5d16148
Address comments around docs, minor renaming
findingrish Sep 23, 2023
d8884be
Remove null check from MetadataResource#getDataSourceInformation
findingrish Sep 23, 2023
0f0805a
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 23, 2023
e129d3e
Install cache module in Coordinator, if feature is enabled and beOver…
findingrish Sep 25, 2023
b4042c6
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 29, 2023
01c27c9
Minor change in QueryableCoordinatorServerView
findingrish Sep 29, 2023
87c9873
Remove QueryableCoordinatorServerView, add a new QuerySegmentWalker i…
findingrish Oct 14, 2023
971b347
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 14, 2023
89d3845
fix build
findingrish Oct 14, 2023
270dbd5
fix build
findingrish Oct 14, 2023
2da23b8
Fix spelling, intellij-inspection, codeql bug
findingrish Oct 14, 2023
6f568a6
undo some changes in CachingClusteredClientTest
findingrish Oct 14, 2023
fe229c0
minor changes
findingrish Oct 14, 2023
473b25c
Fix typo in metric name
findingrish Oct 14, 2023
39fb248
temporarily enable feature on ITs
findingrish Oct 15, 2023
cac695a
fix checkstyle issue
findingrish Oct 15, 2023
eb3e3c1
Changes in CliCoordinator to conditionally add segment metadata cache…
findingrish Oct 15, 2023
30438f4
temporary changes to debug IT failure
findingrish Oct 16, 2023
e88ad00
revert temporary changes in gha
findingrish Oct 16, 2023
61d130b
revert temporary changes to run ITs with this feature
findingrish Oct 16, 2023
2e4c45b
update docs with the config for enabling feature
findingrish Oct 16, 2023
255cf2c
update docs with the config for enabling feature
findingrish Oct 16, 2023
1a6dfc5
Add IT for the feature
findingrish Oct 17, 2023
a961501
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 17, 2023
2e65726
Merge branch 'coordinator_builds_ds_schema' of github.com:findingrish…
findingrish Oct 17, 2023
4b51c42
Changes in BrokerSegmentMetadataCache to poll schema for all the loca…
findingrish Oct 20, 2023
a4e2097
Address review comments
findingrish Oct 26, 2023
3ca03c9
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 26, 2023
902abd3
Run DruidSchemaInternRowSignatureBenchmark using BrokerSegmentMetadat…
findingrish Oct 26, 2023
bcab458
Address feedback
findingrish Oct 26, 2023
32a4065
Simplify logic for setting isRealtime in sys segments table
findingrish Oct 26, 2023
6b04ee7
Remove forbidden api invocation
findingrish Oct 26, 2023
cb93e43
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 26, 2023
152c480
Debug log when coordinator poll fails
findingrish Oct 26, 2023
80c6d26
Fix CoordinatorSegmentMetadataCacheTest
findingrish Oct 27, 2023
9cca98e
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 28, 2023
bb69cde
Minor changes
findingrish Oct 28, 2023
a604e65
Initial class for persisting and retrieving segment schema
findingrish Nov 6, 2023
f2d8a5e
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Nov 6, 2023
37ac4a7
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Dec 12, 2023
629f3ad
fix conflicts
findingrish Dec 12, 2023
af34c34
Revert changes in SegmentLoadInfo
findingrish Dec 12, 2023
4a4f1b0
Remove segmentSchemaMapping table
findingrish Dec 12, 2023
b142b29
Initial commit
findingrish Dec 15, 2023
035a884
Outline
findingrish Jan 10, 2024
7935349
Revert some changes
findingrish Jan 10, 2024
26ee22f
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Jan 10, 2024
1555592
minor changes
findingrish Jan 10, 2024
ccd2c42
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Jan 10, 2024
900dad5
Minor changes
findingrish Jan 10, 2024
45423fc
Add kill task, update schema persist code
findingrish Jan 14, 2024
33cfb75
Initial task side changes to publish schema
findingrish Jan 15, 2024
6c7e0d5
Changes to publish schema for streaming task
findingrish Jan 21, 2024
85594a7
Update schema table
findingrish Jan 22, 2024
72f671c
Fix build
findingrish Jan 23, 2024
ecc8fc4
Changes to publish realtime segment schema, poll, cache and use it fo…
findingrish Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Dderby.stream.error.file=var/druid/derby.log
-agentlib:jdwp=transport=dt_socket,server=y,address=5005,suspend=n
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ druid.coordinator.asOverlord.overlordService=druid/overlord
druid.indexer.queue.startDelay=PT5S

druid.indexer.storage.type=metadata

druid.centralizedDatasourceSchema.enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ druid.worker.capacity=3
druid.worker.baseTaskDirs=[\"var/druid/task\"]

# Task launch parameters
druid.indexer.runner.javaOptsArray=["-server","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
druid.indexer.runner.javaOptsArray=["-server","-agentlib:jdwp=transport=dt_socket,server=y,address=5007,suspend=n","-Xms1g","-Xmx1g","-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]

# HTTP server threads
druid.server.http.numThreads=50
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1538,9 +1538,9 @@ private static TaskAction<SegmentPublishResult> createAppendAction(
)
{
if (taskLockType.equals(TaskLockType.APPEND)) {
return SegmentTransactionalAppendAction.forSegments(segments);
return SegmentTransactionalAppendAction.forSegments(segments, Collections.emptyMap());
} else if (taskLockType.equals(TaskLockType.SHARED)) {
return SegmentTransactionalInsertAction.appendAction(segments, null, null);
return SegmentTransactionalInsertAction.appendAction(segments, null, null, Collections.emptyMap());
} else {
throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public String getPassword()
//by the code using this
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{
// todo should we set segments table here
return new MetadataStorageTablesConfig(
null,
null,
Expand All @@ -98,6 +99,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
null,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

/**
* Stuff that may be needed by a Task in order to conduct its business.
Expand Down Expand Up @@ -348,7 +350,7 @@ public void publishSegments(Iterable<DataSegment> segments) throws IOException
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(
ImmutableSet.copyOf(segmentCollection), null, null
ImmutableSet.copyOf(segmentCollection), null, null, Collections.emptyMap()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.column.SegmentSchemaMetadata;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -38,12 +41,16 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
private final Set<DataSegment> segments;

private final Map<String, SegmentSchemaMetadata> schemaMetadataMap;

@JsonCreator
public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("schemaMetadataMap") Map<String, SegmentSchemaMetadata> schemaMetadataMap
)
{
this.segments = ImmutableSet.copyOf(segments);
this.schemaMetadataMap = ImmutableMap.copyOf(schemaMetadataMap);
}

@JsonProperty
Expand All @@ -68,7 +75,7 @@ public TypeReference<Set<DataSegment>> getReturnTypeReference()
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return SegmentTransactionalInsertAction.appendAction(segments, null, null).perform(task, toolbox).getSegments();
return SegmentTransactionalInsertAction.appendAction(segments, null, null, schemaMetadataMap).perform(task, toolbox).getSegments();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.column.SegmentSchemaMetadata;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand All @@ -51,26 +52,29 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;
private final Map<String, SegmentSchemaMetadata> segmentsToPublishSchema;

public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments, Map<String, SegmentSchemaMetadata> segmentsToPublishSchema)
{
return new SegmentTransactionalAppendAction(segments, null, null);
return new SegmentTransactionalAppendAction(segments, null, null, segmentsToPublishSchema);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
DataSourceMetadata endMetadata,
Map<String, SegmentSchemaMetadata> segmentsToPublishSchema
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, segmentsToPublishSchema);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("schemaMap") @Nullable Map<String, SegmentSchemaMetadata> schemaMetadataMap
)
{
this.segments = segments;
Expand All @@ -81,6 +85,7 @@ private SegmentTransactionalAppendAction(
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
}
this.segmentsToPublishSchema = schemaMetadataMap;
}

@JsonProperty
Expand Down Expand Up @@ -135,14 +140,16 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
segmentToReplaceLock,
segmentsToPublishSchema
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
segmentsToPublishSchema
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.column.SegmentSchemaMetadata;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -62,6 +64,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
*/
private final Set<DataSegment> segments;

private final Map<String, SegmentSchemaMetadata> schemaMetadataMap;

@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
Expand All @@ -74,16 +78,17 @@ public static SegmentTransactionalInsertAction overwriteAction(
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null);
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null, Collections.emptyMap());
}

public static SegmentTransactionalInsertAction appendAction(
Set<DataSegment> segments,
@Nullable DataSourceMetadata startMetadata,
@Nullable DataSourceMetadata endMetadata
@Nullable DataSourceMetadata endMetadata,
Map<String, SegmentSchemaMetadata> schemaMetadataMap
)
{
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null);
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null, schemaMetadataMap);
}

public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
Expand All @@ -92,7 +97,7 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource);
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource, Collections.emptyMap());
}

@JsonCreator
Expand All @@ -101,14 +106,16 @@ private SegmentTransactionalInsertAction(
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("dataSource") @Nullable String dataSource
@JsonProperty("dataSource") @Nullable String dataSource,
@JsonProperty("schemaMetadataMap") @Nullable Map<String, SegmentSchemaMetadata> schemaMetadataMap
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
this.dataSource = dataSource;
this.schemaMetadataMap = schemaMetadataMap;
}

@JsonProperty
Expand Down Expand Up @@ -145,6 +152,13 @@ public String getDataSource()
return dataSource;
}

@JsonProperty
@Nullable
public Map<String, SegmentSchemaMetadata> getSchemaMetadataMap()
{
return schemaMetadataMap;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand Down Expand Up @@ -201,7 +215,8 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
() -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata(
segments,
startMetadata,
endMetadata
endMetadata,
schemaMetadataMap
)
)
.onInvalidLocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ protected TaskAction<SegmentPublishResult> buildPublishAction(
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish, Collections.emptyMap());
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> {
final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata, map) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
Expand All @@ -361,7 +361,8 @@ public TaskStatus runTask(final TaskToolbox toolbox)
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
segments,
null,
null
null,
Collections.emptyMap()
);
return toolbox.getTaskActionClient().submit(action);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ private TaskStatus generateAndPublishSegments(

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

Expand Down Expand Up @@ -1035,13 +1035,13 @@ private TaskStatus generateAndPublishSegments(
buildSegmentsMeters.getUnparseable(),
buildSegmentsMeters.getThrownAway()
);
log.info("Published [%s] segments", published.getSegments().size());
log.info("Published [%s] segments", published.getSegmentSchema().size());

// publish metrics:
emitMetric(toolbox.getEmitter(), "ingest/tombstones/count", tombStones.size());
// segments count metric is documented to include tombstones
emitMetric(toolbox.getEmitter(), "ingest/segments/count",
published.getSegments().size() + tombStones.size()
published.getSegmentSchema().size() + tombStones.size()
);

log.debugSegments(published.getSegments(), "Published segments");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,13 +1170,13 @@ private void publishSegments(

final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse();
final TransactionalSegmentPublisher publisher =
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit(
(segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit(
buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType)
);

final boolean published =
newSegments.isEmpty()
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess();
|| publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, Collections.emptyMap()).isSuccess();

if (published) {
LOG.info("Published [%d] segments", newSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private Set<DataSegment> generateAndPushSegments(
// which makes the size of segments smaller.
final SegmentsAndCommitMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
LOG.info("Pushed [%s] segments", pushed.getSegments().size());
LOG.info("Pushed [%s] segments", pushed.getSegmentSchema().size());
LOG.infoSegments(pushed.getSegments(), "Pushed segments");
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.column.SegmentSchemaMetadata;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -219,9 +220,9 @@ boolean canHandle(
}

if (runner.isEndOffsetExclusive()) {
ret &= recordOffset.compareTo(partitionEndOffset) < 0;
ret = ret && recordOffset.compareTo(partitionEndOffset) < 0;
} else {
ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
ret = ret && recordOffset.compareTo(partitionEndOffset) <= 0;
}

return ret;
Expand Down Expand Up @@ -351,7 +352,8 @@ public SequenceMetadataTransactionalSegmentPublisher(
public SegmentPublishResult publishAnnotatedSegments(
@Nullable Set<DataSegment> mustBeNullOrEmptyOverwriteSegments,
Set<DataSegment> segmentsToPush,
@Nullable Object commitMetadata
@Nullable Object commitMetadata,
Map<String, SegmentSchemaMetadata> segmentsToPublishSchema
) throws IOException
{
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
Expand Down Expand Up @@ -417,12 +419,12 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata);
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, segmentsToPublishSchema)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata, segmentsToPublishSchema);
} else {
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.forSegments(segmentsToPush)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
? SegmentTransactionalAppendAction.forSegments(segmentsToPush, segmentsToPublishSchema)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null, segmentsToPublishSchema);
}

return toolbox.getTaskActionClient().submit(action);
Expand Down
Loading
Loading