Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(router): avoid worker starvation during job pickup #2379

Merged
merged 2 commits into from
Nov 1, 2022

Conversation

atzoum
Copy link
Contributor

@atzoum atzoum commented Aug 31, 2022

Description

Introducing jobiterator package in router, responsible for performing additional queries against jobsDB in order to fetch more jobs in case that some of the initially picked-up jobs get discarded (e.g. due to job ordering barrier, throttling or backoff). The following (configurable) limitations apply:

  1. Iterator will stop querying after Router.jobIterator.maxQueries (default: 10). Setting this to 1, effectively disables the feature.
  2. Iterator will stop querying if the percentage of discarded jobs for the running query is less than or equal to Router.jobIterator.discardedPercentageTolerance (default: 10%)

Note: an additional limitation applies when JobsDB.fairPickup is enabled: A maximum number of Router.maxDSQuery datasets can be queried at any time (default: 10).

Additionally, the following changes/improvements have been introduced:

  1. Include destinationID in the job order key along with userID.
  2. Remove latenciesUsed & timeGained from GetRouterPickupJobs.
  3. Remove JobsDB.useJoinForUnprocessed.
  4. Move server startup logic away from package main, to the runner package, so that we can move integration test files in the packages that they belong to.

Notion Ticket

Link

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

BEGIN_COMMIT_OVERRIDE
feat(router): avoid worker starvation during job pickup (#2379)
END_COMMIT_OVERRIDE

@github-actions github-actions bot added the Stale label Sep 21, 2022
@atzoum atzoum removed the Stale label Sep 21, 2022
@github-actions github-actions bot added the Stale label Oct 12, 2022
@atzoum atzoum removed the Stale label Oct 12, 2022
@atzoum atzoum force-pushed the feat.routerDeepPickup branch 2 times, most recently from 5bc7efa to 9a47c30 Compare October 17, 2022 09:18
@@ -704,7 +705,6 @@ func loadConfig() {
config.RegisterDurationConfigVariable(5, &refreshDSListLoopSleepDuration, true, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
config.RegisterDurationConfigVariable(5, &backupCheckSleepDuration, true, time.Second, []string{"JobsDB.backupCheckSleepDuration", "JobsDB.backupCheckSleepDurationIns"}...)
config.RegisterDurationConfigVariable(5, &cacheExpiration, true, time.Minute, []string{"JobsDB.cacheExpiration"}...)
useJoinForUnprocessed = config.GetBool("JobsDB.useJoinForUnprocessed", true)
Copy link
Contributor Author

@atzoum atzoum Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: removing this config option since we are always using a join for unprocessed

@@ -17,14 +17,15 @@ type Options struct {
}

// LoadOptions loads application's initialisation options based on command line flags and environment
func LoadOptions() *Options {
func LoadOptions(args []string) *Options {
Copy link
Contributor Author

@atzoum atzoum Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: adding support for calling Run multiple times within the same test


type MultiTenantLegacy struct {
*HandleT
}

func (mj *MultiTenantLegacy) GetAllJobs(ctx context.Context, workspaceCount map[string]int, params GetQueryParamsT, _ int) ([]*JobT, error) { // skipcq: CRT-P0003
type legacyMoreToken struct {
Copy link
Contributor Author

@atzoum atzoum Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: for legacy query we need to keep track of each subQuery's latest job ID, thus we are using a different type of MoreToken

type (
MoreToken interface{}
moreToken struct {
afterJobIDs map[string]*int64
Copy link
Contributor Author

@atzoum atzoum Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: for fair pickup algorithm we need to keep track of all the latest job IDs per workspace

jd.markClearEmptyResult(ds, allWorkspaces, stateFilters, customValFilters, parameterFilters, willTryToSet, nil)

var stateQuery, customValQuery, limitQuery, sourceQuery string
skipCacheResult := params.AfterJobID != nil
Copy link
Contributor Author

@atzoum atzoum Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: we don't update the cache if the query contains an AfterJobID parameter

@codecov
Copy link

codecov bot commented Oct 17, 2022

Codecov Report

Base: 43.15% // Head: 44.63% // Increases project coverage by +1.47% 🎉

Coverage data is based on head (f414ca7) compared to base (79e3e34).
Patch coverage: 89.91% of modified lines in pull request are covered.

❗ Current head f414ca7 differs from pull request most recent head 7af8db7. Consider uploading reports for the commit 7af8db7 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2379      +/-   ##
==========================================
+ Coverage   43.15%   44.63%   +1.47%     
==========================================
  Files         186      188       +2     
  Lines       40020    39148     -872     
==========================================
+ Hits        17272    17475     +203     
+ Misses      21642    20581    -1061     
+ Partials     1106     1092      -14     
Impacted Files Coverage Δ
services/multitenant/legacy.go 0.00% <0.00%> (ø)
services/multitenant/noop.go 0.00% <0.00%> (ø)
jobsdb/unionQueryLegacy.go 62.29% <79.31%> (+5.15%) ⬆️
jobsdb/unionQuery.go 81.69% <84.72%> (-2.69%) ⬇️
router/router.go 74.83% <89.39%> (+7.10%) ⬆️
jobsdb/jobsdb.go 68.80% <91.13%> (+3.36%) ⬆️
router/internal/jobiterator/jobiterator.go 100.00% <100.00%> (ø)
services/multitenant/tenantstats.go 82.77% <100.00%> (-0.23%) ⬇️
services/streammanager/kafka/client/consumer.go 75.92% <0.00%> (-3.71%) ⬇️
... and 25 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@atzoum atzoum marked this pull request as ready for review October 17, 2022 15:33
@atzoum atzoum force-pushed the feat.routerDeepPickup branch 4 times, most recently from 1c77cb7 to edbabb0 Compare October 18, 2022 08:57
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
exitCode := Run(ctx)
r := runner.New(runner.ReleaseInfo{
Copy link
Contributor Author

@atzoum atzoum Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain: moved running logic away from main, to the runner package, so that we can move integration test files in the packages that they belong.

@rudderlabs rudderlabs deleted a comment from github-actions bot Oct 18, 2022
@rudderlabs rudderlabs deleted a comment from github-actions bot Oct 18, 2022
@atzoum atzoum force-pushed the feat.routerDeepPickup branch 2 times, most recently from 3376cd2 to 2a744c6 Compare October 18, 2022 14:04
@atzoum atzoum marked this pull request as draft October 18, 2022 14:21
@atzoum atzoum changed the title [WIP] feat(router): avoid worker starvation during job pickup feat(router): avoid worker starvation during job pickup Oct 18, 2022
@atzoum atzoum force-pushed the feat.routerDeepPickup branch 2 times, most recently from 8fa22d9 to 6a4074e Compare October 19, 2022 11:25
@Sidddddarth
Copy link
Member

LGTM
Only reservation I have is resetting the pickupMap in the iterator.HasNext() between successive iterations.
Because this way we'd only be picking up discarded number of jobs in further iterations.
In case we don't find as many jobs in the first iteration and several more are stored(enough to fill the pickupmap counts), we'd end up not picking them up soon enough.

@atzoum
Copy link
Contributor Author

atzoum commented Oct 28, 2022

Because this way we'd only be picking up discarded number of jobs in further iterations.

This behaviour is intentional. The iterator only compensates for discarded jobs to avoid ending up with an unproductive pickup loop.

In case we don't find as many jobs in the first iteration and several more are stored(enough to fill the pickupmap counts), we'd end up not picking them up soon enough.

After an iterator completes, the next pickup loop will create a fresh iterator with fresh limits. The time between two pickup loops should remain small enough and shouldn't cause any significant delays in picking up newly arrived jobs.

@Sidddddarth
Copy link
Member

One other thought is that since we query by job_id as well now, we could use

type dataSetRangeT struct {
	minJobID  int64
	maxJobID  int64
	startTime int64
	endTime   int64
	ds        dataSetT
}

to check whether or not to query a DS(except the last DS).

@atzoum
Copy link
Contributor Author

atzoum commented Oct 29, 2022

One other thought is that since we query by job_id as well now, we could use

type dataSetRangeT struct {
	minJobID  int64
	maxJobID  int64
	startTime int64
	endTime   int64
	ds        dataSetT
}

to check whether or not to query a DS(except the last DS).

Yes this would be an option, however, since job_id is an indexed column, the sql query itself should be sufficiently quick and lightweight, and should handle equally efficiently the absence of results due to the job_id condition.

Thus I would argue that we can start without this special optimisation branch in our codebase.

@Sidddddarth
Copy link
Member

We'd also want to have a memory limit over how the jobIterator fetches jobs right..?
Because every iteration it could fetch many jobs and discard them and all these jobs would be in memory till they get garbage collected..

@atzoum
Copy link
Contributor Author

atzoum commented Oct 31, 2022

till they get garbage collected..

Since discarded jobs will be eligible for garbage collection, we can rely on that, if memory resources become scarce, they will get garbage collected.

router/router.go Outdated Show resolved Hide resolved
@chandumlg
Copy link
Member

Awesome work! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants