Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update insert pending segments logic to synchronous #6336

Merged
merged 5 commits into from
Oct 23, 2018
Merged

update insert pending segments logic to synchronous #6336

merged 5 commits into from
Oct 23, 2018

Conversation

FaxianZhao
Copy link
Contributor

@FaxianZhao FaxianZhao commented Sep 17, 2018

I delete the branch about #6283 by mistake. So I can't modify that pull request.

There are some comments.

  1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict.
  2. When segments table is too big to select the overlaps segment items. Add an index to 'dataSource used end' is work well for the most of scenarios. It will speed up getting recently segments for select and insert.
    Add an index to 'dataSource used start' is a good idea, but I didn't include it in this patch.
  3. I think 'select and insert' doesn't need within transaction. (Fix same issue with phase 1)
  4. sync's performance is better than concurrent retry in my business.

…s READ_COMMITTED will reduce insert id conflict.

2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB.
3. 'select and insert' is not need within transaction.
@jihoonson
Copy link
Contributor

Hi @FaxianZhao, thanks for the PR! It looks reasonable. Would you please fix the conflicts?

skipSegmentLineageCheck
);
SegmentIdentifier identifier;
synchronized (lockResult.getTaskLock()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskLock is shared by tasks only if they have the same groupId (see TaskLockPosse). Maybe this works for kafka indexing service because all kafkaIndexTasks for the same dataSource have the same groupId, but it's not always true for other task types.

I think it makes sense to call toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment() inside of TaskLockBox.doInCriticalSection(). It's designed to perform some action in a critical section, so that any other tasks can't revoke the lock of the current task. Also, it guarantees that only one task can perform the action at the same time. Please check SegmentTransactionalInsertAction as an example. What do you think?

@FaxianZhao
Copy link
Contributor Author

FaxianZhao commented Oct 8, 2018

Hi @jihoonson , I fix the conflicts.
And your suggestion is better, I apply it, please take a look.

Should I rebase these 3 commits as one?

@jihoonson
Copy link
Contributor

jihoonson commented Oct 18, 2018

@FaxianZhao thanks for the fix! You don't have to rebase any commits in most cases. Once you commit, it's fine to leave them as they are.

The failing unit tests seem legit. Please check it.

java.lang.RuntimeException: java.lang.NullPointerException: actionOnInvalidLocks
	at org.apache.druid.indexing.common.actions.SegmentAllocateAction.tryAllocate(SegmentAllocateAction.java:292)
	at org.apache.druid.indexing.common.actions.SegmentAllocateAction.tryAllocateSubsequentSegment(SegmentAllocateAction.java:248)
	at org.apache.druid.indexing.common.actions.SegmentAllocateAction.perform(SegmentAllocateAction.java:171)
	at org.apache.druid.indexing.common.actions.SegmentAllocateActionTest.allocate(SegmentAllocateActionTest.java:720)
	at org.apache.druid.indexing.common.actions.SegmentAllocateActionTest.testCannotAddToExistingSingleDimensionShardSpecs(SegmentAllocateActionTest.java:670)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:367)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:274)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:161)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:290)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:242)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:121)
Caused by: java.lang.NullPointerException: actionOnInvalidLocks
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:229)
	at org.apache.druid.indexing.overlord.CriticalAction.<init>(CriticalAction.java:48)
	at org.apache.druid.indexing.overlord.CriticalAction.<init>(CriticalAction.java:40)
	at org.apache.druid.indexing.overlord.CriticalAction$Builder.build(CriticalAction.java:80)
	at org.apache.druid.indexing.common.actions.SegmentAllocateAction.tryAllocate(SegmentAllocateAction.java:288)
	... 32 more

@FaxianZhao
Copy link
Contributor Author

@jihoonson Sorry for that typo 'null', I correct it as '() -> null'.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @FaxianZhao!

@jihoonson jihoonson merged commit c5bf4e7 into apache:master Oct 23, 2018
@FaxianZhao FaxianZhao deleted the sync_insert_pendingSegments branch October 23, 2018 14:16
gianm pushed a commit to implydata/druid-public that referenced this pull request Nov 16, 2018
* 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict.
2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB.
3. 'select and insert' is not need within transaction.

* Use TaskLockbox.doInCriticalSection instead of synchronized syntax to speed up insert pending segments.

* fix typo for NullPointerException
gianm pushed a commit to implydata/druid-public that referenced this pull request Nov 16, 2018
* 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict.
2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB.
3. 'select and insert' is not need within transaction.

* Use TaskLockbox.doInCriticalSection instead of synchronized syntax to speed up insert pending segments.

* fix typo for NullPointerException
@QiuMM
Copy link
Member

QiuMM commented Dec 11, 2018

@FaxianZhao @jihoonson I think this PR should be reverted, after merged this PR, it takes very long long time to open the overlord console. In my scenario, there are 1200 peon tasks and we use mysql as the metastore. In this PR, use TaskLockbox#doInCriticalSection to perform allocating pending segments which seems would block other things. After revert this, my overlord works fine again.

lark20181211-215648

@QiuMM
Copy link
Member

QiuMM commented Dec 30, 2018

@jihoonson @FaxianZhao I think I know why the overlord console loading slow after merged this PR, the reason is every index task request overlord to perform a segment allocate action and all tasks occupied a http handle thread which blocked at TaskLockbox#doInCriticalSection, so there are no idle threads or resources to response other requests. The larger the cluster, the slower it will be. I tried to increase the value of druid.server.http.numThreads and allocate more cores for the overlord, but it's not helpful. In my view, this PR let we lose more than what we gained. Taking minutes to open the overlord console makes me feeling badly!

@jihoonson
Copy link
Contributor

@QiuMM thank you for reporting! I think TaskLockbox potentially has this problem for all methods because they are coordinated by a single giant lock. I guess this becomes significant in large clusters when allocatePendingSegment is executed inside of doInCriticalSection because it's a heavy method.

I think this PR is still useful because we can avoid the slow pending segment allocation problem caused by the race in allocatePendingSegment, but the solution may need to be improved. How about improving the lock granularity for taskLockbox? There's no need to use a single lock, but it can use more granular lock per dataSource per interval.

@jihoonson
Copy link
Contributor

BTW, I guess your pendingSegments table might be very large if allocatePendingSegment takes too long. Does cleaning up old entries in pendingSegments table help?

@QiuMM
Copy link
Member

QiuMM commented Dec 31, 2018

How about improving the lock granularity for taskLockbox? There's no need to use a single lock, but it can use more granular lock per dataSource per interval.

@jihoonson I have tried like below:

/**
   * Perform the given action with a guarantee that we synchronize actions on the same (Datasource, Interval) pair.
   * This method just checks that the pair of (Datasource, Interval) is valid, does not care about locks for the given
   * task and interval, so you should care about the locks by yourself.
   *
   * @param task     task performing a {@link SyncDataSourceAndIntervalAction}
   * @param interval interval
   * @param action   action to be performed
   */
  public <T> T doInSyncDataSourceAndInterval(
      Task task,
      Interval interval,
      SyncDataSourceAndIntervalAction<T> action
  ) throws Exception
  {
    Pair<String, Interval> pair = dataSourceIntervals.getOrDefault(task.getDataSource(), new HashMap<>()).get(interval);
    if (pair != null) {
      synchronized (pair) {
        return action.perform();
      }
    } else {
      log.warn("Invalid dataSource and interval pair: (%s, %s), can not sync on it!", task.getDataSource(), interval);
      return null;
    }
  }

However, the overlord console still loading very slowly after I restart the overlord since all newly created tasks post a segment allocate action. But about one hours later, I didn't see any delay when open the console except the moment that all tasks start to allocate new segments. I think it's still terrible because every time I restart the overlord I would like to open the console to see the status of the overlord, It's hard to bear if it loading slow. Maybe we should not restart all tasks at the same moment, how about add some delay?

Besides, after I use such granular lock, some errors occured:

2018-12-29 18:19:10,252 ERROR [qtp685428529-136] com.sun.jersey.spi.container.ContainerResponse - The RuntimeException could not be mapped to a response, re-throwing to the HTTP container
io.druid.java.util.common.ISE: Segments not covered by locks for task: index_kafka_huoshan_all_map_3c8768ae0f0f563_ihpmdnek
        at io.druid.indexing.common.actions.TaskActionPreconditions.checkLockCoversSegments(TaskActionPreconditions.java:45) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.common.actions.SegmentTransactionalInsertAction.perform(SegmentTransactionalInsertAction.java:107) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.common.actions.SegmentTransactionalInsertAction.perform(SegmentTransactionalInsertAction.java:47) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.common.actions.LocalTaskActionClient.submit(LocalTaskActionClient.java:64) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.overlord.http.OverlordResource$3.apply(OverlordResource.java:368) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.overlord.http.OverlordResource$3.apply(OverlordResource.java:357) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.overlord.http.OverlordResource.asLeaderWith(OverlordResource.java:683) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at io.druid.indexing.overlord.http.OverlordResource.doAction(OverlordResource.java:354) ~[druid-indexing-service-0.12.2.jar:0.12.2]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]

Such error occurs after I restart the overlord and all newly created tasks would pending for some minutes which is weird and make me confused (I have never modified SegmentTransactionalInsertAction). But after all tasks are running normally, I never see any errors again.

BTW, I guess your pendingSegments table might be very large if allocatePendingSegment takes too long. Does cleaning up old entries in pendingSegments table help?

I have already set druid.coordinator.kill.pendingSegments.on to true, so old entries would be cleaned up.

@jihoonson
Copy link
Contributor

@QiuMM thank you for trying it out! I understand what you feel when the UI is very slow. We need to fix this.

It sounds like the too many concurrent segmentAllocateActions is causing this problem, but, I don't understand why it's still slow with the granular locking. The segmentAllocateAction calls 2 methods of taskLockbox.

  1. It calls tryLock to get a taskLock. This method is coordinated by a single giant lock.
  2. Once it gets a taskLock, it calls allocatePendingSegmentAction in doInCriticalSection or (doInSyncDataSourceAndInterval).

Before this PR, 2. was executed outside of doInCriticalSection, so there was no coordination. And now with doInSyncDataSourceAndInterval, there should be no lock contention between tasks unless they are replica tasks. Then, I think this should work as it was before this PR (or work with a little lock contention between replicas). Do you know what method is the bottleneck exactly? Probably checking overlord's stack trace would be helpful.

Regarding the exception of io.druid.java.util.common.ISE: Segments not covered by locks for task, this happens when a task tries to publish new segments and update the metadata store without a valid taskLock. Since the taskLock must be acquired before calling allocatePendingSegmentAction in doInSyncDataSourceAndInterval, I don't think this exception is related to the granular locking. Is it possible that some tasks revoked other task locks by any chance?

@QiuMM
Copy link
Member

QiuMM commented Dec 31, 2018

there should be no lock contention between tasks unless they are replica tasks

@jihoonson not only replica tasks, all tasks create by a supervisor would share same TaskLock#groupId, and in my scenario there are at most 600 tasks for a datasource. So still many tasks share the same lock.

Since the taskLock must be acquired before calling allocatePendingSegmentAction in doInSyncDataSourceAndInterval, I don't think this exception is related to the granular locking.

I'm not very sure about this, but I have also observed such exception even if I didn't use doInSyncDataSourceAndInterval.

@jihoonson
Copy link
Contributor

I see. Since the lock contention can happen between tasks if they're writing into the interval of the same dataSource, 600 running tasks at the same time can cause this issue. If this is the case, before this PR, I think such many tasks could cause frequent pending segment allocation failures due to the allowed race in allocatePendingSegments. Those failed actions will be retried, but make segment allocation slow. Were you able to see this kind of problem before upgrade?

Maybe we need to use separate http thread pools for API and UI. Also, maybe #6348 and #6356 would help though you said your pendingSegment table is small.

@QiuMM
Copy link
Member

QiuMM commented Jan 1, 2019

Those failed actions will be retried, but make segment allocation slow. Were you able to see this kind of problem before upgrade?

Yeah, I'm going to see this problem. I'll let you know if I found something. And I think I know why the exception of io.druid.java.util.common.ISE: Segments not covered by locks for task occurred, the reason is I used HeapMemoryTaskStorage, so when I restarted the overlord all locks information lost.
For the table indexes, I had merged the PR before.

Maybe we need to use separate http thread pools for API and UI.

This is a good solution, but it seems not easy to do for Jetty Server.

@jihoonson
Copy link
Contributor

Thanks. I've checked some Jetty configurations and it looks that Jetty's QoS filter (https://www.eclipse.org/jetty/documentation/9.4.x/qos-filter.html) might help. We're already using this filter for task's chat handler and the lookup. I think we can use it for the overlord too. The max number of requests for internal API calls is limited by some new configurations like maxRequests, so that some HTTP threads are available for UI requests. What do you think?

@QiuMM
Copy link
Member

QiuMM commented Jan 2, 2019

Sounds good, I'll have a try.

@jon-wei jon-wei added this to the 0.14.0 milestone Feb 20, 2019
@jon-wei
Copy link
Contributor

jon-wei commented Feb 23, 2019

@QiuMM were you able to find anything interesting? I'm wondering if we need to adjust this PR or make special callouts in the release notes

@QiuMM
Copy link
Member

QiuMM commented Feb 23, 2019

@jihoonson @jon-wei oh, I almost forgot this thing.

If this is the case, before this PR, I think such many tasks could cause frequent pending segment allocation failures due to the allowed race in allocatePendingSegments. Those failed actions will be retried, but make segment allocation slow. Were you able to see this kind of problem before upgrade?

Yeah, I had found many retries. And I had tried the QoS filter, it's not a good solution if there are thousands of SegmentAllocateAction, because it would also cause many retries, requests exceeded the maxRequests would be rejected and the tasks would retry until success. It's hard to determine the value of maxRequests and things would be worse if there are many tries.

Finally, I solved this problem by using a more granular lock, i.e using the doInSyncDataSourceAndInterval method, then deployed the overlord in a machine with enough resources (CPU 24 cores, Memory 48GB). Before did this, I just deployed the overlord in a container with resources (CPU 8 cores, Memory 16GB).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants