Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

equals method in SegmentId looks buggy #8882

Closed
ArvinZheng opened this issue Nov 16, 2019 · 7 comments
Closed

equals method in SegmentId looks buggy #8882

ArvinZheng opened this issue Nov 16, 2019 · 7 comments

Comments

@ArvinZheng
Copy link
Contributor

ArvinZheng commented Nov 16, 2019

Coordinator fails to distribute segments and suspect the equals method in SegmentId caused this issue.

  private int computeHashCode()
  {
    // Start with partitionNum and version hash codes, because they are often little sequential numbers. If they are
    // added in the end of the chain, resulting hashCode of SegmentId could have worse distribution.
    int hashCode = partitionNum;
    // 1000003 is a constant used in Google AutoValue, provides a little better distribution than 31
    hashCode = hashCode * 1000003 + version.hashCode();

    hashCode = hashCode * 1000003 + dataSource.hashCode();
    hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
    hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
    hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);
    return hashCode;
  }

  @Override
  public boolean equals(Object o)
  {
    if (this == o) {
      return true;
    }
    if (!(o instanceof SegmentId)) {
      return false;
    }
    SegmentId that = (SegmentId) o;
    // Compare hashCode instead of partitionNum: break the chain quicker if the objects are not equal. If the hashCodes
    // are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal.
    return hashCode == that.hashCode &&
           dataSource.equals(that.dataSource) &&
           intervalStartMillis == that.intervalStartMillis &&
           intervalEndMillis == that.intervalEndMillis &&
           Objects.equals(intervalChronology, that.intervalChronology) &&
           version.equals(that.version);
  }

Affected Version

0.16.0

Description

We got exceptionMessage=Can't add chunk[org.apache.druid.timeline.partition.LinearPartitionChunk@1cb644f] to a full atomicUpdateGroup[AtomicUpdateGroup{chunks=[org.apache.druid.timeline.partition.LinearPartitionChunk@9a644f]}]} error from Coordinator when we were trying upgrading our cluster from 0.13.0 to 0.16.0, refer Stack trace for details.
To give more contexts here, we are using our own indexer which reads data from Kafka and builds segments. We use LinearShardSpec for creating our segments and maintain an internal logic to generate the partition number base on our data distribution strategy, the generated partition number per interval is not that consecutive, e.g. following partition numbers are generated for the same interval, and we may produce 2000 to 4000 segments in our production.
10112217, 100412240, 110112589, 120212564, 20110920, 30212707, 40112686, 50312650, 60212319, 70412079, 80212020, 90312664

  • Cluster size
    We found this on our dev cluster which has only 3 historical nodes, 1 query broker and 1 coordinator.

  • Steps to reproduce the problem
    Not able to reproduce the problem since we found this issue when we were trying to upgrade our DEV cluster to 0.16.0 and the segments may have been cleaned up.

  • Any debugging that you have already done

  1. Went through VersionedIntervalTimeline, OvershadowableManager, AtomicUpdateGroup and RootPartitionRange classes, for the same interval, version, we should not have multiple segments which have LinearShardSpec belong to the same AtomicUpdateGroup, since the RootPartitionRange for LinearShardSpec is always (partitionNum, partitionNum+1), we should never have same partition number for the same interval (guaranteed by DB table constraint).
  2. In doPoll() method of SQLMetadataSegmentManager, every time when we query DB, we try to replace a segment by an existing one in previous snapshot if we can find it by SegmentId. If there is any collision can be occurred for 2 different partition numbers (the rest factors are the same) from computeHashCode() method of SegmentId class, our segment poll is broken.
  3. Unfortunately we couldn't reproduce the issue, but I still don't feel comfortable to use a hash for a equals method of an Java Object, especially when it's being used as a key-lookup, in my poor opinion, comparing partitionNum there would be better (considering performance vs correctness, and I am not sure how much performance we could improve by comparing the hash).
  • Stack trace
2019-11-13T01:43:23,526 ERROR [org.apache.druid.metadata.SQLMetadataSegmentManager-Exec--0] org.apache.druid.metadata.SQLMetadataSegmentManager - Uncaught exception in class org.apache.druid.metadata.SQLMetadataSegmentManager's polling thread: {class=org.apache.druid.metadata.SQLMetadataSegmentManager, exceptionType=class org.apache.druid.java.util.common.ISE, exceptionMessage=Can't add chunk[org.apache.druid.timeline.partition.LinearPartitionChunk@1cb644f] to a full atomicUpdateGroup[AtomicUpdateGroup{chunks=[org.apache.druid.timeline.partition.LinearPartitionChunk@9a644f]}]}
org.apache.druid.java.util.common.ISE: Can't add chunk[org.apache.druid.timeline.partition.LinearPartitionChunk@1cb644f] to a full atomicUpdateGroup[AtomicUpdateGroup{chunks=[org.apache.druid.timeline.partition.LinearPartitionChunk@9a644f]}]
    at org.apache.druid.timeline.partition.OvershadowableManager.addChunk(OvershadowableManager.java:656) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.timeline.partition.PartitionHolder.add(PartitionHolder.java:61) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.timeline.VersionedIntervalTimeline.addAll(VersionedIntervalTimeline.java:188) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.timeline.VersionedIntervalTimeline.addSegments(VersionedIntervalTimeline.java:116) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.timeline.VersionedIntervalTimeline.forSegments(VersionedIntervalTimeline.java:107) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.timeline.VersionedIntervalTimeline.forSegments(VersionedIntervalTimeline.java:100) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.client.DataSourcesSnapshot.lambda$new$3(DataSourcesSnapshot.java:84) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.utils.CollectionUtils.lambda$mapValues$0(CollectionUtils.java:96) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_181]
    at org.apache.druid.utils.CollectionUtils.mapValues(CollectionUtils.java:96) ~[druid-core-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.client.DataSourcesSnapshot.<init>(DataSourcesSnapshot.java:82) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.client.DataSourcesSnapshot.fromUsedSegments(DataSourcesSnapshot.java:54) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.metadata.SQLMetadataSegmentManager.doPoll(SQLMetadataSegmentManager.java:973) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.metadata.SQLMetadataSegmentManager.poll(SQLMetadataSegmentManager.java:905) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at org.apache.druid.metadata.SQLMetadataSegmentManager.lambda$createPollTaskForStartOrder$0(SQLMetadataSegmentManager.java:321) ~[druid-server-0.16.0-incubating-iap3.jar:0.16.0-incubating-iap3]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_181]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_181]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

cc: @michaelschiff

@leventov
Copy link
Member

Unfortunately we couldn't reproduce the issue, but I still don't feel comfortable to use a hash for a equals method of an Java Object, especially when it's being used as a key-lookup, in my poor opinion, comparing partitionNum there would be better (considering performance vs correctness, and I am not sure how much performance we could improve by comparing the hash)

If you think this issue is down specifically to SegmentId implementation, could you try to construct a unit test (with your LinearShardSpec) which demonstrates the inconsistency of equals() and hashCode(), or that hashCode() is buggy?

@michaelschiff
Copy link
Contributor

I'm also not certain that the equals method is correct as written

The hashcode is calculated via: (comments removed for readability)

int hashCode = partitionNum;
hashCode = hashCode * 1000003 + version.hashCode();
hashCode = hashCode * 1000003 + dataSource.hashCode();
hashCode = hashCode * 1000003 + Long.hashCode(intervalStartMillis);
hashCode = hashCode * 1000003 + Long.hashCode(intervalEndMillis);
hashCode = hashCode * 1000003 + Objects.hashCode(intervalChronology);

and the comment from equals:

// Compare hashCode instead of partitionNum: break the chain quicker if the objects are not equal. If the hashCodes
// are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal.

That logic If the hashCodes are equal as well as all other fields used to compute them, the partitionNums are also guaranteed to be equal. will hold over an infinite field, but over a finite field (like int) it is possible that you can start with two different partition numbers, multiply them by the same sequence of numbers, and arrive at the same answer % 2^32.

Take as a simple example, partitions x & y, where x != y and there is only one other "field" used to compute the hash-code (which in this example is held constant) and we are using the finite field Mod 8

x = 1
y = 3

x * 4 Mod 8 = 4

y * 4 Mod 8 = 4

While a full size counter example is constructed, I want to point out that this discussion could be obviated by adding an additional comparison to the equals method (checking the partition number too). The comparison of the hashcode can be left in for quicker exit still (since if the hashcodes are different, they are certainly different), but in the case of a hashcode collision all fields needs to be checked.

@michaelschiff
Copy link
Contributor

Im no longer convinced that the logic is incorrect.

a ≡ b (mod m) iff m | (a - b)

Taking a simple example with only a single other field, whose hashcode is k, we have

((a * 1000003) + k) ≡ ((b * 1000003) + k) (mod 2^32)

using the above becomes

2^32 | ((a * 1000003) + k) - ((b * 1000003) + k)

2^32 | (a * 1000003) - (b * 1000003)

2^32 | 1000003 * (a - b)

since 1000003 and 2^32 are coprime, their least common multiple is their product. Given that, the only way the above can hold is if (a-b) is a multiple of 2^32. This is not possible if a and b are both int.

All else equal, I still think it makes sense to include the partition number comparison. Thinking through this level of modular arithmetic to prove an equals() method correct feels too clever to me.

@ArvinZheng
Copy link
Contributor Author

ArvinZheng commented Nov 19, 2019

@michaelschiff thanks for looking into this. I did a simple math and also confirmed this should not be an issue.

@leventov
I am able to reproduce the issue now, we got the Exception and could not start our Coordinator, the root cause is - When constructing RootPartitionRange, our partition numbers are cast from Integer to Short, hence the collision occurred (our partition numbers are not that consecutive and may have large gap between each other). Checked NumberedPartitionChunk, NumberedOverwritingPartitionChunk, LinearPartitionChunk and IntegerPartitionChunk, the partition numbers are defined as int, should we consider to make the startPartitionId and endPartitionId to int and update corresponding codes.

Code block for reference

    private final short startPartitionId;
    private final short endPartitionId;

    @VisibleForTesting
    static RootPartitionRange of(int startPartitionId, int endPartitionId)
    {
      return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
    }

    private static <T extends Overshadowable<T>> RootPartitionRange of(PartitionChunk<T> chunk)
    {
      return of(chunk.getObject().getStartRootPartitionId(), chunk.getObject().getEndRootPartitionId());
    }

@jihoonson
Copy link
Contributor

Hi @ArvinZheng, thank you for the report. In #7491, a new assumption has been added that partitionId is never greater than Short.MAX_VALUE to reduce the memory usage in the coordinator and the broker. This assumption makes sense to me since partitionId is assumed to be consecutive in most cases and having such a large number of segments in a time chunk generally doesn't happen in practice. I guess this assumption doesn't hold for your use case. What kind of ingestion system are you using? And why are partitionIds very sparse?

@ArvinZheng
Copy link
Contributor Author

ArvinZheng commented Nov 19, 2019

hi @jihoonson , thank you for looking into this. Yah, I just realized this limitation was introduced by the minor compaction. No, the assumption doesn't hold for our use case, as I described, we created our own Indexer based on core Druid libraries.
Our Indexer pulls events from Kafka and split them into small slices to further improve the parallelism, e.g.

  1. Our service coordinator pulls 6M events from Kafka per partition per Batch, and split this batch into 6 small Slices.
  2. Then the coordinator sends this 6 slices to downstream service which is our indexer, and each indexer creates segments independently.
  3. To avoid having collision and creating locks between our indexer instances, we created a partition number generator which creates the partition number base on Kafka Partition Number, Slice Number and Kafka Batch Number (unique and maintained by our coordinator), sample rule as follows,
Kafka Partition Number * PARTITION_NUMBER_MULTIPLIER + Slice Number * SLICE_NUMBER_MULTIPLIER + Kafka Batch Number % SLICE_NUMBER_MULTIPLIER

Hence we have large numeric values for our partition numbers even we may not have so many segments per interval.

I understand it's not trivial to modify current minor compaction, but we should state this situation clearly on the release doc - https://github.com/apache/incubator-druid/releases/tag/druid-0.16.0-incubating, so that people can do some pre-checks on their system. Happy to discuss anything. :)

@jihoonson
Copy link
Contributor

Thanks for the suggestion. It sounds a good idea. I will update release doc sometime soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants