Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Manager v1.0.0 #23632

Closed
tsullivan opened this issue Sep 29, 2018 · 7 comments
Closed

Task Manager v1.0.0 #23632

tsullivan opened this issue Sep 29, 2018 · 7 comments
Assignees
Labels
enhancement New value added to drive a business result Feature:Task Manager Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc

Comments

@tsullivan
Copy link
Member

Kibana Task Manager

Overview

We need a generic system for running background tasks in the Kibana server. It should support:

  • Single-run and recurring tasks
  • Scheduling tasks to run after a specified datetime
  • Basic retry logic
  • Recovery of stalled tasks / timeouts
  • Tracking/accumulating task state across multiple runs
  • Configuring the run-parameters for specific tasks
  • Basic coordination to prevent the same task instance from running on more than one Kibana system at a time

Implementation details

At a high level, task manager will:

  • Allow registration of task type definitions, which include preliminary state, if desired, and a createTaskRunner method that returns an object with run and cancel(optional) methods.
  • Every {poll_interval} milliseconds, check the {index} for any task instances that need to be run:
    • runAt is past
    • attempts is less than the configured threshold
  • Attempt to claim the task by using optimistic concurrency to set:
    • status to running
    • runAt to now + the timeout specified by the task
  • Execute the run method given in the task type definition, if the claim attempt succeeded
  • If the run method from the task type definition fails, increment the attempts count and reschedule it for 5 minutes in the future
  • If the run method from the task type definition succeeds:
    • If it is recurring, the run method will return a new runAt value. We update the document of the task instance with the new runAt for the next iteration of the task.
    • If the run function doesn't return a new runAt value, remove the task instance document from {index} and the task will not recur.
    • Canceling a scheduled task is simple: if a task needs to end it's schedule, the run function will simply will not return a new runAt field.
    • This also allows every task instance document in a schedule to have the same _id

Pooling

The task manager of a Kibana instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the Kibana instance from running too many tasks at once in resource constrained environments.

In addition to this, task type definitions can configure tasks to run in smaller pools to limit how many tasks of a given type can be run at once.

Config options

The task_manager can be configured via task_manager config options (e.g. task_manager.maxAttempts):

  • max_attempts - How many times a failing task instance will be retried before it is never run again
  • poll_interval - How often the background worker should check the task_manager index for more work
  • index - The name of the index that the task_manager
  • max_workers - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
  • override_num_workers - Object to customize the number of workers occupied by specific tasks defined by the fields of the object (e.g. override_num_workers.reporting: 2)

Task definitions

Plugins define tasks by adding a task type definition with the task manager service.

{
  type: string; // A unique identifier for the type of task being defined.
  title: string; // A brief, human-friendly title for this task.
  description?: string; // An optional more detailed description of what this task does.
  timeOut?: string; // How long, in minutes, the system should wait for the task to complete before it is considered to be timed out. e.g. '5m'
  numWorkers?: number; //  The number of workers / slots a running instance of this task occupies. This defaults to 1.
  createTaskRunner: TaskRunCreatorFunction; // A function that returns an object that has a run function which performs the task's work, and an optional cancel function which can cancel the task.
}

When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its run method, passing it a run context which looks like this:

{
  kbnServer: object; // The Kibana server object. This gives tasks full-access to the server object, including the various ES options client functions e.g. `callWithInternalUser`
  taskInstance: object; // The document fetched from ES describing the task instance, its params, state, id, etc.
}

Task result

The task's run method is expected to return a promise that resolves to an object that conforms to the following interface:

{
  runAt?: Date; // Specifies the next run date / time for this task. If unspecified, this is treated as a single-run task, and will not be rescheduled after completion.
  error?: object; // Optional indication that the task failed to accomplish its work. This is logged out as a warning and stored in the task instance document in Elasticsearch for reference
  state?: object; // The state which will be passed to the next run of this task (if this is a recurring task).
}

State should be relatively small, as it is stored as a string of JSON. Large blobs in state will impact performance of migration and maintenance of the task manager index. If you need to store big values in state, you can instead write those values to your own index, and store the ids of the documents in state.

Timeouts

If the promise returned by the run function has a cancel method, the cancel method will be called if Kibana determines that the task has timed out via referencing the timeOut field of the task definition. The cancel method itself can return a promise, and Kibana will wait for the cancellation before attempting a re-run. Tasks can perform cleanup work here, if needed.

Task instances

The task_manager module will store scheduled task instances in a configurable index. This allows recovery of failed tasks, coordination across Kibana clusters, etc.

The data for a task instance is stored and passed as context to the run functions in a way that looks something like this:

{
  id: string; // The id of the Elastic document that stores this instance's data. This can be passed by the caller when scheduling the task.
  version: number; // The version of the Elaticsearch document.
  attempts: number; // The number of unsuccessful attempts since the last successful run, for recurring tasks.
  status: TaskStatus; // Indicates whether or not the task is running or idle.
  runAt: Date; // The date and time that this task is scheduled to be run. It is not guaranteed * to run at this time, but it is guaranteed not to run earlier than this.
  state: object; // The state passed into the task's run function, and returned by the previous run. Stored as a string of JSON, but parsed and passed as an object in the run context. 
}

Programmatic access

The task manager plugin exposes an object in its namespace on the server object, which plugins can use to manage scheduled tasks.

const manager = server.plugins.taskManager;

// Registers a definition of a task type that can be scheduled and run
manager.defineTaskType(taskTypeName, {
  type: taskTypeName,
  title,
  description,
  timeOut,
  numWorkers,
  createTaskRunner(runContext) {
    return {
      run: getMyTaskRunFunction(runContext),
      cancel: getMyTaskCancelFunction(runContext),
    }
  }
});

// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = manager.schedule({
  taskType: taskTypeName,
  runAt,
  params,
  state,
});

// Removes the specified task
manager.remove({ id: task.id });

// Fetches tasks, supports pagination, via the search-after API:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
const results = manager.fetch(query);

// results look something like this:
{
  searchAfter: ['233322'],
  // Tasks is an array of task instances
  tasks: [{
    id: '3242342',
    type: 'reporting',
    // etc
  }]
}

Middleware

Plugins will be able to augment task instances data with beforeSchedule hooks, and modify run context with beforeRun hooks.

Limitations in v1.0.0

  • Since task execution happens in the server background, it's not possible to use callWithRequest in a run function as run is not in a request context. Using callWithInternalUser is possible. It will also be possible for a plugin to add username/password or other authorization config to kibana.yml and connect to Elasticsearch with those credentials, or use a token to talk to a 3rd party service. Ideally, there will be support in Elasticsearch for generating long-running authentication tokens on behalf of a requesting user.
  • There is no built-in performance monitoring or actual tracking of execution cost for tasks. Determining an appropriate numWorkers value for a task definition is a best-guess approach.
  • In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value.
  • There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'. This is possible if a Kibana instance claims a task, and then promptly dies. Expired tasks found in the pool will have their cancel method called, so they can self-clean up. There won't be a "heartbeat" mechanism that updates active tasks periodically so they can be monitored as being actively handled.
  • There isn't a built-in way in Kibana to create cancellable async operations. Ccancellable promises for now are an implementation detail of the task definition themselves. They can use RxJS or implement their own library for it. We might want something exposed by the task manager service for this in the future.
  • The task manager's public API is create / delete / list. Updates are supported via scheduling a task using an existing task ID.
  • Initially, it will not be possible to disable polling to claim pending tasks in a Kibana instance. That will be possible in a separate change. It will allow an instance to act as an "observer": able to list active tasks and their status and state / param data, but not responsible for claiming and executing tasks.
  • There won't be a way to change priority of registered task type definitions after the definition has been registered.
  • The initial delivery of the task manager service won't include any peripheral plugin changes nto change them into consumers of the service. In follow-up changes, there are some good candidates of Elastic products in Kibana that can be refactored to use the task manager service.
@tsullivan
Copy link
Member Author

@epixa epixa added Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc enhancement New value added to drive a business result and removed Feature:New Platform labels Sep 30, 2018
@bleskes
Copy link
Contributor

bleskes commented Oct 1, 2018

I read this out general interest and it looks great to me. I wrote down some things that I thought were worth sharing will reading.

Recovery of stalled tasks / timeouts

What does recovery mean exactly? if a kibana instance runs a tasks and it just takes too long, I presume it will be killed and marked as problematic.

Basic coordination to prevent the same task instance from running on more than one Kibana system at a time

Heads up - you can reduce the chance of this happening a lot, but you can't prevent it completely. It's important in task design to be aware of it and try to design safe operations. For example - index using predictable ids (rather than an auto-gened one) to prevent duplicates if the index operation is run twice.

If the run method from the task type definition fails, increment the attempts count and reschedule it for 5 minutes in the future
If the run method from the task type definition succeeds:
...

This needs to use optimistic locking to make sure that another instance didn't "claim" the task and is running it after timeout.

Tasks are considered expired if their runAt has arrived, and their status is still 'running'. This is possible if a Kibana instance claims a task, and then promptly dies.

Do I understand it correctly that every Kibana instance will also poll for tasks that have been marked as running X time ago (or it's runAt is X old) and force claim them? I think that's good but I would recommend having X being 1.5 times the timeout setting so it will give a live kibana instance some time to mark a task as timed out, when it truly does time out.

Last I would also recommend that each Kibana instance will run it's periodic check on a randomized offset. For example, if poll_interval is set to 5m, Kibana will schedule their first run for something random between 0m and 5m and then continue at a 5m interval. This is to prevent unneeded competition when claiming a task.

@chrisdavies
Copy link
Contributor

Recovery of stalled tasks / timeouts

Yeap. It's vague. Maybe we should just pull that line out of the description. Basically, I was just trying to say that the task manager has some strategy for handling tasks that don't complete in time. It's currently a naive strategy (retry a few times, then stop retrying if it fails too many times).

Basic coordination to prevent the same task instance from running on more than one Kibana system at a time

This is done using optimistic concurrency / locking, and no id generation is used, so I think it should be as safe as we can make it. To be precise, id generation may be used when the task is initially scheduled, but not on any of the reschedules.

1.5 times the timeout

I'm not sure. If a task author is conservative, and gives themselves a long timeout, I don't know that we want to multiply it. (e.g. 2 days becomes 3 days, which is an extreme example, but you get the drift). I think authors of tasks that have a risk of timing out should be intentional about the timeout value given.

I would also recommend that each Kibana instance will run it's periodic check on a randomized offset.

That's an interesting idea. It's likely to already happen, as Kibana bootup times will not be predictive, but they'll probably be fairly close. The polling will diverge and converge over time, though, so I'm not sure how much we'll gain, but we will make an individual instance a bit harder to test and reason about.

@bleskes
Copy link
Contributor

bleskes commented Oct 2, 2018

This is done using optimistic concurrency / locking, and no id generation is used, so I think it should be as safe as we can make it. To be precise, id generation may be used when the task is initially scheduled, but not on any of the reschedules.

I think I wasn't specific enough. If a task has secondary output, like indexing a document of with it's result into ES, that document should have an id that is derived from the task inputs. For example, when ML analyze the data of a certain time slot, the result is stored as a document with an id that is derived from the job id and the time slot. This means that if two instances of the same task runs concurrently, only a single document will be created (and overridden).

e.g. 2 days becomes 3 days, which is an extreme example, but you get the drift

You can go with a 50% increase with a maximum of 5 minutes. My concern here is that you need to give some time for the kibana instance that actually runs the index to process the timeout before another kibana instance takes over and runs it again.

Kibana bootup times will not be predictive, but they'll probably be fairly close. The polling will diverge and converge over time, though, so I'm not sure how much we'll gain,

I'm not sure what you mean exactly with diverge and converge. The exactly dynamics depend on your scheduling logic (for example if you schedule based on rounded wall clock time, you'd have that problem). Just something to think about.

@tsullivan
Copy link
Member Author

@stacey-gammon in order to not side track a separate issue, I'm going to reply to something you posted on an issue about saved object API and reporting:

The task manager is coming soon too, and so there was an open question of where these task results would be stored - in the task manager, or still in the separate .reporting index? We decided to still keep the reports in the .reporting index because of risk of bloating the task manager, but it was also an open question at the time.

I don't know if this question is tracked as open in some other issue. I'd say from Task Manager, it's not really a question of bloating the capabilities. We should keep storing the results of Reporting (generated payload, status, etc) because Reporting will work better when its own index instead of going through an abstraction. Task Manager has it's own index to represent scheduled tasks, and there is a string field for accumulated state and function parameters to the run function. Those are there to help Task Manager keep track of things on repeated runs of a recurring task. It would not be practical to store all the data needed for Reporting in that index.

@alexfrancoeur
Copy link

@tsullivan @njd5475 @chrisdavies can we close this now that the beta version has been merged? For additional features and improvements we still have #25271 open

@tsullivan
Copy link
Member Author

closed via #24356

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New value added to drive a business result Feature:Task Manager Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc
Projects
None yet
Development

No branches or pull requests

6 participants