Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SQLMetadataSegmentManager; Change contract of REST methods in DataSourcesResource #7653

Merged
merged 20 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.druid.server.coordinator;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
Expand All @@ -42,6 +44,7 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -102,14 +105,10 @@ public void setup()
);
}

dataSources = new HashMap<>();
List<DataSegment> segments = new ArrayList<>();
for (int i = 0; i < numDataSources; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;

VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);

final int startYear = ThreadLocalRandom.current().nextInt(2000, 2040);
DateTime date = DateTimes.of(startYear, 1, 1, 0, 0);

Expand All @@ -127,12 +126,11 @@ public void setup()
0,
segmentSizeBytes
);
timeline.add(segment.getInterval(), segment.getVersion(), shardSpec.createChunk(segment));
segments.add(segment);
}
}

dataSources.put(dataSource, timeline);
}
dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()).getUsedSegmentsTimelinesPerDataSource();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public void error(Throwable t, String message, Object... formatArgs)
log.error(StringUtils.nonStrictFormat(message, formatArgs), t);
}

public void assertionError(String message, Object... formatArgs)
{
log.error("ASSERTION_ERROR: " + message, formatArgs);
}

public void wtf(String message, Object... formatArgs)
{
log.error(StringUtils.nonStrictFormat("WTF?!: " + message, formatArgs), new Exception());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.service.AlertBuilder;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.io.StringWriter;

Expand Down Expand Up @@ -61,15 +62,22 @@ public AlertBuilder makeAlert(String message, Object... objects)
return makeAlert(null, message, objects);
}

public AlertBuilder makeAlert(Throwable t, String message, Object... objects)
public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... objects)
{
if (emitter == null) {
final String errorMessage = StringUtils.format(
"Emitter not initialized! Cannot alert. Please make sure to call %s.registerEmitter()", this.getClass()
"Emitter not initialized! Cannot alert. Please make sure to call %s.registerEmitter()\n"
+ "Message: %s",
this.getClass(),
StringUtils.nonStrictFormat(message, objects)
);

error(errorMessage);
throw new ISE(errorMessage);
ISE e = new ISE(errorMessage);
if (t != null) {
e.addSuppressed(t);
}
throw e;
}

final AlertBuilder retVal = new EmittingAlertBuilder(t, StringUtils.format(message, objects), emitter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;


Expand Down Expand Up @@ -50,5 +51,5 @@ public interface TimelineLookup<VersionType, ObjectType>
*/
List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);

PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
@Nullable PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -44,7 +46,9 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.StreamSupport;

/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
Expand Down Expand Up @@ -75,12 +79,11 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
Comparators.intervalsByStartThenEnd()
);
private final Map<Interval, TreeMap<VersionType, TimelineEntry>> allTimelineEntries = new HashMap<>();
private final AtomicInteger numObjects = new AtomicInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VersionedIntervalTimeline is not thread-safe. Should this be just an integer? I'm not sure how you're planning to use VersionedIntervalTimeline.

Copy link
Contributor

@jihoonson jihoonson Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, this variable is used only in iterateAllObjects. Should it return a collection? Or can we change the return type to Stream or Iterator? With this change, we don't have to track the number of objects in the timeline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VersionedIntervalTimeline actually has to be thread-safe: see #7285.

Copy link
Member Author

@leventov leventov Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more convenient for users of the class if the method returns a Collection rather than Iterable or Stream: it makes possible both for-each iteration and a stream pipeline. Although currently this method is used in just one place with forEach() call, I don't see much value of over-specializing for this case now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As aforementioned, we don't have to have numObjects if iterateAllObjects() returns an Iterable or Stream. As you noted, the only user is calling forEach() on the result, it doesn't have to be a collection. As far as I remember, we agreed on simplifying unnecessarily generalized code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually discovered a use of numObjects in #7306, from where this PR is partially extracted: https://github.com/apache/incubator-druid/pull/7306/files#diff-b3571a010cbbb3cb648676a89936a51fR277


private final Comparator<? super VersionType> versionComparator;

public VersionedIntervalTimeline(
Comparator<? super VersionType> versionComparator
)
public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator)
{
this.versionComparator = versionComparator;
}
Expand All @@ -92,7 +95,8 @@ public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterabl

public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
final VersionedIntervalTimeline<String, DataSegment> timeline =
new VersionedIntervalTimeline<>(Comparator.naturalOrder());
addSegments(timeline, segments);
return timeline;
}
Expand All @@ -115,6 +119,28 @@ public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries(
return allTimelineEntries;
}

/**
* Returns a lazy collection with all objects (including overshadowed, see {@link #findOvershadowed}) in this
* VersionedIntervalTimeline to be used for iteration or {@link Collection#stream()} transformation. The order of
* objects in this collection is unspecified.
*
* Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
* once rather than several times.
*/
public Collection<ObjectType> iterateAllObjects()
{
return CollectionUtils.createLazyCollectionFromStream(
() -> allTimelineEntries
.values()
.stream()
.flatMap((TreeMap<VersionType, TimelineEntry> entryMap) -> entryMap.values().stream())
.flatMap((TimelineEntry entry) -> StreamSupport.stream(entry.getPartitionHolder().spliterator(), false))
.map(PartitionChunk::getObject),
numObjects.get()
);
}

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
Expand Down Expand Up @@ -143,15 +169,19 @@ private void addAll(
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
numObjects.incrementAndGet();
} else {
entry = exists.get(version);

if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<>(object));
exists.put(version, entry);
numObjects.incrementAndGet();
} else {
PartitionHolder<ObjectType> partitionHolder = entry.getPartitionHolder();
partitionHolder.add(object);
if (partitionHolder.add(object)) {
numObjects.incrementAndGet();
}
}
}

Expand All @@ -174,6 +204,7 @@ private void addAll(
}
}

@Nullable
public PartitionChunk<ObjectType> remove(Interval interval, VersionType version, PartitionChunk<ObjectType> chunk)
{
try {
Expand All @@ -189,7 +220,11 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,
return null;
}

PartitionChunk<ObjectType> retVal = entry.getPartitionHolder().remove(chunk);
PartitionChunk<ObjectType> removedChunk = entry.getPartitionHolder().remove(chunk);
if (removedChunk == null) {
return null;
}
numObjects.decrementAndGet();
if (entry.getPartitionHolder().isEmpty()) {
versionEntries.remove(version);
if (versionEntries.isEmpty()) {
Expand All @@ -201,25 +236,23 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,

remove(completePartitionsTimeline, interval, entry, false);

return retVal;
return removedChunk;
}
finally {
lock.writeLock().unlock();
}
}

@Override
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
public @Nullable PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
{
try {
lock.readLock().lock();
for (Map.Entry<Interval, TreeMap<VersionType, TimelineEntry>> entry : allTimelineEntries.entrySet()) {
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
return new ImmutablePartitionHolder<ObjectType>(
foundEntry.getPartitionHolder()
);
return new ImmutablePartitionHolder<>(foundEntry.getPartitionHolder());
}
}
}
Expand Down Expand Up @@ -307,6 +340,10 @@ private TimelineObjectHolder<VersionType, ObjectType> timelineEntryToObjectHolde
);
}

/**
* This method should be deduplicated with DataSourcesSnapshot.determineOvershadowedSegments(): see
* https://github.com/apache/incubator-druid/issues/8070.
*/
public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
{
try {
Expand All @@ -315,8 +352,8 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()

Map<Interval, Map<VersionType, TimelineEntry>> overShadowed = new HashMap<>();
for (Map.Entry<Interval, TreeMap<VersionType, TimelineEntry>> versionEntry : allTimelineEntries.entrySet()) {
Map<VersionType, TimelineEntry> versionCopy = new HashMap<>();
versionCopy.putAll(versionEntry.getValue());
@SuppressWarnings("unchecked")
Map<VersionType, TimelineEntry> versionCopy = (TreeMap) versionEntry.getValue().clone();
overShadowed.put(versionEntry.getKey(), versionCopy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*/
public class ImmutablePartitionHolder<T> extends PartitionHolder<T>
{
public ImmutablePartitionHolder(PartitionHolder partitionHolder)
public ImmutablePartitionHolder(PartitionHolder<T> partitionHolder)
{
super(partitionHolder);
}
Expand All @@ -35,7 +35,7 @@ public PartitionChunk<T> remove(PartitionChunk<T> tPartitionChunk)
}

@Override
public void add(PartitionChunk<T> tPartitionChunk)
public boolean add(PartitionChunk<T> tPartitionChunk)
{
throw new UnsupportedOperationException();
}
Expand Down
Loading