Skip to content

SO 5.6 InDepth Agent Priorities

Yauheni Akhotnikau edited this page Aug 30, 2019 · 1 revision

Intro

Every agent has a priority. Priority is just an optional mark for dispatchers. Some dispatcher can use this mark for priority-specific events scheduling, some dispatcher can’t. There are several dispatchers in SObjectizer which support agent’s priorities. They will be described below.

Basics

Agent’s priority is represented as enumeration so_5::priority_t. There are only 8 priorities: from the lowest (so_5::priority_t::p0) to the highest (so_5::priority_t::p7).

A priority for an agent can be specified during the agent’s construction.

// For ordinary agents:
class my_agent : public so_5::agent_t
{
public :
  my_agent( context_t ctx, ... /* some other args */ )
    : so_5::agent_t( ctx + so_5::priority_t::p3 )
    ...
};

Once agent has been constructed the priority cannot be changed.

There is a namespace so_5::prio with some useful helper constants and functions. For example instead of writing so_5::priority_t::p3 it is possible to write so_5::prio::p3. Or even p3 under using namespace so_5::prio construct.

Dispatchers With Support For Priorities

There are three dispatchers which support agent’s priorities: prio_one_thread::strictly_ordered, prio_one_thread::quote_round_robin, prio_dedicated_threads::one_per_prio. They do priority-specific handling of agent’s events.

Other standard dispatchers (one_thread, active_obj, active_group, thread_pool, adv_thread_pool) do not support priorities.

Common Threads Vs Dedicated Threads

There is a principal difference in event handling with respect to agent’s priorities: will all events for agents with different priorities be handled on the same worker thread or events with different priorities will be handled on its own, dedicated threads.

Handling of events with different priorities on the context of common worker thread allows doing several important things. At first, there is a possibility to strict ordering of events with respect to priorities. It means that event with higher priority will block events with lower priority. At second there is a possibility for easy sharing some data between agents with different priorities. Because all agents will work on the same thread there is no need for synchronizing access to shared data.

Those things are impossible when events with different priorities are handled on different threads.

Because of that dispatchers with priorities support are grouped into two groups: prio_one_thread (it means that all events are processed on the same working thread) and prio_dedicated_thread (it means that events will different priorities will be processed on different threads).

Dispatchers With Common Thread For All Priorities

Group prio_one_thread consists of two dispatchers: strictly_ordered and quoted_round_robin.

Dispatcher prio_one_thread::strictly_ordered

Working Principles

This dispatcher allows for events of high priority agents to block events of low priority agents. It means that events queue is always strictly ordered: events for agents with high priority are placed before events for agents with lower priority.

Let's see that in an example. Assume that the dispatcher's queue is empty at the beginning:

{}

Then three events arrive: e1(a4), e2(a7), e3(a2). Here a7 means an agent with priority 7. The dispatcher's queue becomes:

{e2(a7), e1(a4), e3(a2)}

Then two new events arrive: e4(a4) and e5(a6). The dispatcher's queue becomes:

{e2(a7), e5(a6), e1(a4), e4(a4), e3(a2)}

Then three new events arrive: e6(a2), e7(a4) and e8(a7). The queue becomes:

{e2(a7), e8(a7), e5(a6), e1(a4), e4(a4), e7(a4), e3(a2), e6(a2)}

After a handling of e2(a7) and e8(a7) the queue becomes:

{e5(a6), e1(a4), e4(a4), e7(a4), e3(a2), e6(a2)}

If two new events -- e9(a6) and e10(a7) -- arrive at this moment then the queue will be:

{e10(a7), e5(a6), e9(a6), e1(a4), e4(a4), e7(a4), e3(a2), e6(a2)}

It means that the chronological order of events will be preserved only for events of agents with the same priority.

It also means that events with the lowest priority can stay in the queue forever. It is possible if new events with higher priority will arrive faster than the queue will be processed.

This dispatcher could be useful if there is a necessity of handling some messages before other messages. For example, there could be a stream of tasks represented by take_job messages. There also could be a special message for the task processor’s reconfiguration: new_config message. It could have the sense to handle new_config as soon as possible.

This can be done by two agents which are bound to single prio_one_thread::strictly_ordered dispatcher. One agent will have priority p1 and will handle new_config message. The second agent will have priority p0 and will handle take_job. Both agents will have common shared data (at least configuration parameters, maybe something else). Dispatcher prio_one_thread::strictly_ordered guarantees that new_config will be handled as soon as the processing of the previous message finished.

Synopsis

The dispatcher is defined in so_5::disp::prio_one_thread::strictly_ordered namespace. There are several traditional functions for dispatcher creation.

There is make_dispatcher function for creation of a new instance of that dispatcher:

so_5::launch( []( so_5::environment_t & env ) {
    env.introduce_coop(
      so_5::disp::prio_one_thread::strictly_ordered::make_dispatcher(env).binder(),
      []( so_5::coop_t & coop ) { ... } );
    ...
  } );

Dispatcher prio_one_thread::quoted_round_robin

Working Principles

Dispatcher prio_one_thread::quoted_round_robin works on round-robin principle. It allows specifying the maximum count of events to be processed consequently for the specified priority. After processing that count of events dispatcher switches to processing events of lower priority even if there are yet more events of higher priority to be processed.

The dispatcher uses very simply working scheme: it handles no more than Q7 events of priority p7, then no more than Q6 events of priority p6, ..., then no more than Q0 events of priority p0. If an event of higher priority is arrived during handling a quote for lower priority no switching is performed. For example, if dispatcher handles events of priority p5 and event of priority p7 is arrived the dispatcher will continue to handle events of priority p5, then events of priority p4 (if any), ..., then events of priority p0 (if any). And only then the dispatcher will start to process events of priority p7.

Events inside the same priority are handled in chronological order.

Let's see the scheme on an example. It's safe to assume that quoted_round_robin dispatcher uses a separate queue for every priority. So we can have a picture like that:

p7: {e1, e2, e3, e4, e5, e6}
p6: {e7, e8}
p5: {e9, e10, e11}
p4: {e12, e13, e14, e15, e16, e17, e18}
p3: {e19}
p2: {e20, e21, e22, e23, e24}
p1: {}
p0: {e25, e26, e27, e28, e29, e30}

Suppose that the same quote will be used for all priorities. Suppose that quote will be 4 (it means no more than 4 events from every priority).

On the first step, four events from the event queue for p7 will be handled. After that, we will have the following content for dispatcher queues:

p7: {e5, e6}
p6: {e7, e8}
p5: {e9, e10, e11}
p4: {e12, e13, e14, e15, e16, e17, e18}
p3: {e19}
p2: {e20, e21, e22, e23, e24}
p1: {}
p0: {e25, e26, e27, e28, e29, e30}

On the next step, two events for priority p6 will be handled. Only two because there is no more events for that priority. And we will have:

p7: {e5, e6}
p6: {}
p5: {e9, e10, e11}
p4: {e12, e13, e14, e15, e16, e17, e18}
p3: {e19}
p2: {e20, e21, e22, e23, e24}
p1: {}
p0: {e25, e26, e27, e28, e29, e30}

On the next step three events for p5 will be handled. And on the next step four events for p4 will be handled. So we will have:

p7: {e5, e6}
p6: {}
p5: {}
p4: {e16, e17, e18}
p3: {e19}
p2: {e20, e21, e22, e23, e24}
p1: {}
p0: {e25, e26, e27, e28, e29, e30}

Suppose that e31 and e32 for p7 arrive at this moment. They will go to the event queue for p7, but the next event to be extracted and handled will be e19 for p3. It is because the current priority to be processed is p3. So we will have:

p7: {e5, e6, e31, e32}
p6: {}
p5: {}
p4: {e16, e17, e18}
p3: {}
p2: {e20, e21, e22, e23, e24}
p1: {}
p0: {e25, e26, e27, e28, e29, e30}

On the next steps four events for p2 and p0 will be handled. And after the full cycle we will have:

p7: {e5, e6, e31, e32}
p6: {}
p5: {}
p4: {e16, e17, e18}
p3: {}
p2: {e24}
p1: {}
p0: {e29, e30}

Then the next cycle starts and four events from p7 will be handled, then three events for p4, one event for p2 and two events for p0.

This working scheme means that the agent’s priorities treated as agent’s weight. A programmer can set bigger quotes for more prioritized (more heavyweight) agents and these agents will receive more resources than less prioritized (less weighted) agents.

A dispatcher of that type can be useful, for example, if there are agents which handles clients of different types. Some clients are VIP clients and they should receive the first-class quality of service and there could be other clients with lower demands for service quality. It is possible to assign a high priority to agents for handling VIP-client requests and specify a large quote for that priority. All other agents will have lower priority and a smaller quote. As result, more requests from VIP-clients will be handled but there also will be the processing of request from other clients.

Synopsis

The dispatcher is defined in so_5::disp::prio_one_thread::quoted_round_robin namespace. There are several traditional functions for dispatcher creation.

Quotes for priorities must be defined before creation of a dispatcher. It can be done via constructing and tuning of object of type so_5::disp::prio_one_thread::quoted_round_robin::quotes_t. For example:

// Default quote for all priorities must be specified in constructor. 
so_5::disp::prio_one_thread::quoted_round_robin::quotes_t quotes{ 10 };
// After that a quote for a particular priority can be changed.
quotes.set( so_5::prio::p7, 50 );
quotes.set( so_5::prio::p5, 40 );
...

Construction of quotes_t object can be done during calling a function for dispatcher creation:

so_5::disp::prio_one_thread::quoted_round_robin::make_dispatcher( env,
  so_5::disp::prio_one_thread::quoted_round_robin::quotes_t{ 10 }
    .set( so_5::prio::p7, 50 ).set( so_5::prio::p5, 40 ) );

There is make_dispatcher function for creation of a new instance of that dispatcher:

so_5::launch( []( so_5::environment_t & env ) {
    env.introduce_coop(
      so_5::disp::prio_one_thread::quoted_round_robin::make_dispatcher(
        env,
        so_5::disp::prio_one_thread::quoted_round_robin::quotes_t{ 20 }
          .set( so_5::prio::p7, 45 ).set( so_5::prio::p6, 35 ).set( so_5::prio::p5, 25 )
        ).binder(),
      []( so_5::coop_t & coop ) { ... } );
    ...
  } );

Dispatcher With Dedicated Thread For Every Priority

Dispatcher prio_dedicated_threads::one_per_prio

Working Principles

This dispatcher creates a single dedicated thread for every priority. It means that events for agents with priority p7 will be handled on a different thread than events for agents with, for example, priority p6.

Events inside the same priority are handled in chronological order.

Priority is assigned to an agent, not to particular events. It means that all events of one agent have the same priority. Because of that, all events of that agent will be handled on the same thread. In other words, an agent is bound to its working thread once and won't be moved from one thread to another.

This working scheme means that the agent’s priority is treated as binding to a particular working thread inside a dispatcher instance. It allows to bound agents with different requirements to different working threads.

Synopsis

The dispatcher is defined in so_5::disp::prio_dedicated_threads::one_per_prio namespace.

There is make_dispatcher function for creation of a new instance of that dispatcher:

so_5::launch( []( so_5::environment_t & env ) {
    env.introduce_coop(
      so_5::disp::prio_dedicated_threads::one_per_prio::make_dispatcher(env).binder(),
      []( so_5::coop_t & coop ) { ... } );
    ...
  } );
Clone this wiki locally