Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Talkative mothership interface #253

Merged
merged 27 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
386772c
WIP: More logserver/microlog output from deployment staging.
mvousden Jun 18, 2021
69e5436
Compilable!
mvousden Jun 18, 2021
f32ff59
More talkative Mothership, outside debug mode.
mvousden Jun 18, 2021
2099341
Fix whitespace and comment inconsistency.
mvousden Jun 18, 2021
ac17435
Make Root more chatty when it receives acknowledgements from a Mother…
mvousden Jun 18, 2021
4a3a708
More todos.
mvousden Jun 18, 2021
7e1a5d1
Add new message keyset so processes can communicate about broken appl…
mvousden Jun 18, 2021
22c70ee
Change helper structures to use strings instead of graph instance poi…
mvousden Jun 18, 2021
95428d9
Mothership sends a cool new message to Root when an app is broken.
mvousden Jun 18, 2021
4dff1ea
Make root register when an app is broken.
mvousden Jun 18, 2021
d344b42
Update todo list
mvousden Jun 18, 2021
0535db1
Add CMND,BRKN message, and use it to allow Motherships (and Root) to …
mvousden Jun 18, 2021
1139699
Another todo
mvousden Jun 18, 2021
c1d61e3
Allow recall messages to recall applications that are marked as broken.
mvousden Jun 21, 2021
a9bf655
Clear deployment state (including ERROR) in Root when all Motherships…
mvousden Jun 21, 2021
9f959a4
Executive decision.
mvousden Jun 21, 2021
42d575b
Nope, not doing that either.
mvousden Jun 21, 2021
5b8b406
Remove an unused deployment state.
mvousden Jun 22, 2021
d543f14
Better UX, by feel.
mvousden Jun 22, 2021
1902a6f
Better microlog text.
mvousden Jun 22, 2021
d3d6a26
More posting for various Mothership-related commands.
mvousden Jun 22, 2021
aceec56
Merge branch 'development' into FEATURE-0245-talkative-mothership-int…
mvousden Jun 22, 2021
139e67a
Better communication, more consistent map keys.
mvousden Jun 22, 2021
76858b1
Only print Mothership closing message in debug mode.
mvousden Jun 22, 2021
335b1de
Update the todo list.
mvousden Jun 22, 2021
62414c1
Remove todo list.
mvousden Jun 22, 2021
198d1b1
Send stop acknowledgements (oops).
mvousden Jun 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions Config/OrchestratorMessages.ocfg
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,19 @@
175(W) : "Application '%s' has no graph instance named '%s'. Not operating on any applications."
176(W) : "Encountered unusual application parameter '%s'. Not operating on any applications."
177(E) : "System command '%s' ran unsuccessfully. Aborting deployment."
178(E) : "Not enough mothership processes exist to deploy task %s. Is a mothership running?"
178(E) : "Not enough mothership processes exist to deploy application %s. Is a mothership running?"
179(E) : "System command '%s' ran unsuccessfully with message '%s'. Aborting deployment."
180(W) : "Ignoring clauses in 'dump /plac' command - dumping all files to '%s'."
181(W) : "No application graph instances have been placed. Not dumping anything."
182(I) : "Placement information dumped to '%s'."
183(E) : "Received MPI message with unrecognised key '%s'. Source rank: %s. Destination rank: %s.
184(I) : "Deployment of graph instance '%s' staged. Waiting for Mothership(s) to acknowledge receipt in the background."
185(W) : "Deployment of graph instance '%s' failed. See microlog for more information."
186(I) : "Application '%s' %s on all Motherships it is mapped to."
187(I) : "Initialisation of graph instance '%s' staged. Waiting for Mothership(s) to acknowledge receipt in the background."
188(I) : "Run of graph instance '%s' staged. Waiting for Mothership(s) to acknowledge receipt in the background."
189(I) : "Stop of graph instance '%s' staged. Waiting for Mothership(s) to acknowledge receipt in the background."
190(I) : "Recall of graph instance '%s' staged. Waiting for Mothership(s) to acknowledge receipt in the background."

196(E) : "%s structural faults found in application %s (file %s) load"

Expand Down Expand Up @@ -215,6 +222,14 @@
526(I) : "Mothership: Supervisor for application '%s' has requested it to be stopped."
527(I) : "Mothership: Message from supervisor for application '%s': %s
528(E) : "Error creating directory '%s' for application '%s': %s"
529(I) : "Mothership (rank %s): Deployment of application '%s' (to this Mothership) complete."
530(I) : "Mothership (rank %s): Initialising fully-defined application '%s'."
531(I) : "Mothership (rank %s): Initialisation of application '%s' (to this Mothership) complete."
532(I) : "Mothership (rank %s): Starting (running) fully-initialised application '%s'."
533(I) : "Mothership (rank %s): Stopping application '%s' (which has been started)."
534(I) : "Mothership (rank %s): All devices on this Mothership for application '%s' have stopped."
535(I) : "Mothership (rank %s): Recalling application '%s'."
536(I) : "Mothership (rank %s): Application '%s' recalled. This Mothership has forgotten everything about this application."

580(E) : "Mothership: Received a log packet with an invalid device index 0x%s."
581(I) : "Mothership: Consuming log packet from device address 0x%s with name %s."
Expand All @@ -229,10 +244,7 @@
590(I) : "Mothership: On draining the Backend Input Queue of packets, the Backend Input Broker Thread has sent queued a Q::BEND,Q:CNC message containing %s packets.
591(I) : "Mothership: Pushing packet with hardware address 0x%s into the compute fabric."
592(I) : "Mothership: The %s queue is empty, so the %s Thread is spinning slowly."
593(I) : "Mothership: Recalling application '%s'."
594(I) : "Mothership: Stopping application '%s' (which has been started)."
595(I) : "Mothership: Starting (running) fully-initialised application '%s'."
596(I) : "Mothership: Initialising fully-defined application '%s'."

597(I) : "Mothership: Processing '%s' (0x%s) message. Fields: %s"
598(I) : "Mothership: Received '%s' message. %s"
599(I) : "Mothership: Received '%s' message. Pushing it to the '%s' queue."
Expand Down
1 change: 1 addition & 0 deletions Source/Common/Pglobals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const byte Q::REQ = 0x80;
const byte Q::ACK = 0x81;
const byte Q::FWD = 0x82;
const byte Q::DEFD = 0x83;
const byte Q::BRKN = 0x84;
// Level 3 subkeys

const byte Q::NAV = 0xff; // Not a value
Expand Down
2 changes: 2 additions & 0 deletions Source/Common/Pglobals.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ MSHP |ACK |LOAD |- | (0:string)Application name
MSHP |ACK |RUN |- | (0:string)Application name
MSHP |ACK |STOP |- | (0:string)Application name
MSHP |REQ |STOP |- | (0:string)Application name
MSHP |REQ |BRKN |- | (0:string)Application name

LogServer
---------
Expand Down Expand Up @@ -161,6 +162,7 @@ static const byte REQ;
static const byte ACKt;
static const byte FWD;
static const byte DEFD;
static const byte BRKN;
// Level 3 subkeys

static const byte NAV; // Not a value
Expand Down
3 changes: 2 additions & 1 deletion Source/Mothership/AppInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ bool AppInfo::should_we_recall()
if (state == DEFINED or
state == READY or
state == RUNNING or
state == STOPPED)
state == STOPPED or
state == BROKEN)
{
return is_recl_staged();
}
Expand Down
46 changes: 35 additions & 11 deletions Source/Mothership/AppTransitions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void Mothership::initialise_application(AppInfo* app)
/* Modes to reduce code repetition between steps 2 and 3. */
bool mode;

debug_post(596, 1, app->name.c_str());
Post(530, int2str(Urank), app->name.c_str());

app->state = LOADING; /* 0: Set the application state to LOADING, duh. */

Expand Down Expand Up @@ -112,7 +112,8 @@ void Mothership::initialise_application(AppInfo* app)
* softswitch, commands all of the executors under its command to start. */
void Mothership::run_application(AppInfo* app)
{
debug_post(595, 1, app->name.c_str());
std::string appName = app->name;
Post(532, int2str(Urank), appName);

app->state = RUNNING;
send_cnc_packet_to_all(app, P_CNC_BARRIER);
Expand All @@ -121,7 +122,7 @@ void Mothership::run_application(AppInfo* app)
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::RUN);
acknowledgement.Put<std::string>(0, &(app->name));
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
queue_mpi_message(&acknowledgement);
}
Expand All @@ -142,22 +143,36 @@ void Mothership::stop_application(AppInfo* app)
std::string appName = app->name;
std::string errorMessage;

debug_post(594, 1, appName.c_str());
Post(533, int2str(Urank), appName.c_str());
app->state = STOPPING;
send_cnc_packet_to_all(app, P_CNC_STOP);
superdb.exit_supervisor(appName);
if(!superdb.reload_supervisor(appName, &errorMessage))
if (!superdb.reload_supervisor(appName, &errorMessage))
{
Post(503, appName, errorMessage);
tell_root_app_is_broken(appName);
app->state = BROKEN;
}

/* On (re)loading the supervisor, provision its API. */
if(!provision_supervisor_api(appName))
if (!provision_supervisor_api(appName))
{
Post(525, appName);
tell_root_app_is_broken(appName);
app->state = BROKEN;
}

/* Send "acknowledgement" message to root, if the application is not
* broken. */
if (app->state != BROKEN)
{
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::STOP);
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
queue_mpi_message(&acknowledgement);
}
}

/* Sends a CNC packet with a given opcode to each thread in an application. */
Expand Down Expand Up @@ -205,17 +220,26 @@ void Mothership::send_cnc_packet_to_all(AppInfo* app, uint8_t opcode)
P_Addr_Pkt_t pkt;
pkt.hwAddr = *threadAddressIt;
pkt.packet = packet;

allPackets.push_back(pkt);
}
threading.push_backend_out_queue(&allPackets);
}

/* Purges all mention of an application in Mothership datastructures, as well
* as cores and threads associated with it. */
* as cores and threads associated with it. Let Root know when done. */
void Mothership::recall_application(AppInfo* app)
{
debug_post(593, 1, app->name.c_str());
superdb.unload_supervisor(app->name);
appdb.recall_app(app);
std::string appName = app->name;
Post(535, int2str(Urank), appName);
superdb.unload_supervisor(appName);
appdb.recall_app(app); /* Clears the name as well! */
Post(536, int2str(Urank), appName);

/* Send "acknowledgement" message to root. */
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::RECL);
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
queue_mpi_message(&acknowledgement);
}
84 changes: 72 additions & 12 deletions Source/Mothership/MPIHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ unsigned Mothership::handle_msg_cnc(PMsg_p* message)
key = "Q::APP,Q::DIST";
else if (message->Key() == PMsg_p::KEY(Q::APP,Q::SUPD))
key = "Q::APP,Q::SUPD";
else if (message->Key() == PMsg_p::KEY(Q::CMND,Q::BRKN))
key = "Q::CMND,Q::BRKN";
else if (message->Key() == PMsg_p::KEY(Q::CMND,Q::RECL))
key = "Q::CMND,Q::RECL";
else if (message->Key() == PMsg_p::KEY(Q::CMND,Q::INIT))
Expand Down Expand Up @@ -116,8 +118,17 @@ unsigned Mothership::handle_msg_app_spec(PMsg_p* message)
appInfo->distCountExpected = distCount;

/* Check for being fully defined (transition from UNDERDEFINED to
* DEFINED). */
appInfo->check_update_defined_state();
* DEFINED). If it is, report back to Root and Post. */
if (appInfo->check_update_defined_state())
{
Post(529, int2str(Urank), appName);
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::DEFD);
queue_mpi_message(&acknowledgement);
}

/* Check for further state transitions. */
if (appInfo->should_we_recall()) recall_application(appInfo);
Expand Down Expand Up @@ -180,6 +191,7 @@ unsigned Mothership::handle_msg_app_dist(PMsg_p* message)
if (!appInfo->increment_dist_count_current())
{
Post(524, appName, uint2str(appInfo->distCountExpected));
tell_root_app_is_broken(appName);
appInfo->state = BROKEN;
return 0;
}
Expand All @@ -199,12 +211,13 @@ unsigned Mothership::handle_msg_app_dist(PMsg_p* message)
}

/* Check for being fully defined (transition from UNDERDEFINED to
* DEFINED). If it is, report back to Root. */
* DEFINED). If it is, report back to Root and Post. */
if (appInfo->check_update_defined_state())
{
Post(529, int2str(Urank), appName);
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Put<std::string>(0, &(appInfo->name));
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::DEFD);
queue_mpi_message(&acknowledgement);
Expand Down Expand Up @@ -244,6 +257,7 @@ unsigned Mothership::handle_msg_app_supd(PMsg_p* message)
if(!superdb.load_supervisor(appName, soPath, &errorMessage))
{
Post(503, appName, errorMessage);
tell_root_app_is_broken(appName);
appInfo->state = BROKEN;
return 0;
}
Expand All @@ -252,6 +266,7 @@ unsigned Mothership::handle_msg_app_supd(PMsg_p* message)
if(!provision_supervisor_api(appName))
{
Post(525, appName);
tell_root_app_is_broken(appName);
appInfo->state = BROKEN;
return 0;
}
Expand All @@ -261,13 +276,23 @@ unsigned Mothership::handle_msg_app_supd(PMsg_p* message)
if (!appInfo->increment_dist_count_current())
{
Post(524, appName, uint2str(appInfo->distCountExpected));
tell_root_app_is_broken(appName);
appInfo->state = BROKEN;
return 0;
}

/* Check for being fully defined (transition from UNDERDEFINED to
* DEFINED). */
appInfo->check_update_defined_state();
* DEFINED). If it is, report back to Root and Post. */
if (appInfo->check_update_defined_state())
{
Post(529, int2str(Urank), appName);
PMsg_p acknowledgement;
acknowledgement.Src(Urank);
acknowledgement.Put(0, &appName);
acknowledgement.Tgt(pPmap->U.Root);
acknowledgement.Key(Q::MSHP, Q::ACK, Q::DEFD);
queue_mpi_message(&acknowledgement);
}

/* Check for further state transitions. */
if (appInfo->should_we_recall()) recall_application(appInfo);
Expand Down Expand Up @@ -300,6 +325,42 @@ unsigned Mothership::handle_msg_cmnd_recl(PMsg_p* message)
return 0;
}

unsigned Mothership::handle_msg_cmnd_brkn(PMsg_p* message)
{
AppInfo* appInfo;

/* Pull message contents. */
std::string appName;
if (!decode_string_message(message, &appName))
{
debug_post(597, 3, "Q::CMND,Q::BRKN", hex2str(message->Key()).c_str(),
"Failed to decode.");
return 0;
}

debug_post(597, 3, "Q::CMND,Q::BRKN", hex2str(message->Key()).c_str(),
dformat("appName=%s", appName.c_str()).c_str());

/* Get the application */
appInfo = appdb.check_create_app(appName);

/* If the app is running, stop it. */
if (appInfo->state == RUNNING)
{
appInfo->stage_stop();
stop_application(appInfo);
}

/* Mark it as broken (even if it is already marked as such). Note that,
* while the app is stopping, it will have state "STOPPING". After all of
* the devices have reported back, it will have state "BROKEN" again.
*
* Breaking in this way does not inform Root, because otherwise we would be
* bouncing messages forever. */
appInfo->state = BROKEN;
return 0;
}

unsigned Mothership::handle_msg_cmnd_init(PMsg_p* message)
{
AppInfo* appInfo;
Expand Down Expand Up @@ -438,26 +499,25 @@ unsigned Mothership::handle_msg_bend_supr(PMsg_p* message)
return 0;
}

/* Set up a vector of packets for the supervisor entry point to modify.
* If the vector comes back with entries, then we have packets to send.
/* Set up a vector of packets for the supervisor entry point to modify. If
* the vector comes back with entries, then we have packets to send.
*/
std::vector<P_Pkt_t> inputPackets;
std::vector<P_Addr_Pkt_t> outputPackets;
// Get the input packets.

/* Get the input packets. */
decode_packets_message(message, &inputPackets, 1);

/* Invoke the supervisor, send the message if instructed to do so, and
* propagate errors. */
rc = superdb.call_supervisor(appName, inputPackets, outputPackets);
if (outputPackets.size() > 0)
if (outputPackets.size() > 0)
{
PMsg_p outputMessage;
outputMessage.Tgt(Urank);
outputMessage.Src(Urank);
outputMessage.Key(Q::PKTS);
outputMessage.Put<P_Addr_Pkt_t> (0, &(outputPackets));

queue_mpi_message(&outputMessage);
}
if (rc < 0) Post(515, appName);
Expand Down
11 changes: 11 additions & 0 deletions Source/Mothership/Mothership.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ bool Mothership::debug_post(int code, unsigned numArgs, ...)
#endif
}

/* Sends a message to Root explaining that an app is broken. */
void Mothership::tell_root_app_is_broken(std::string appName)
{
PMsg_p sadTidings;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

sadTidings.Src(Urank);
sadTidings.Put(0, &appName);
sadTidings.Tgt(pPmap->U.Root);
sadTidings.Key(Q::MSHP, Q::REQ, Q::BRKN);
queue_mpi_message(&sadTidings);
}

/* Defines OnIdle behaviour for the Mothership (ala CommonBase) - this
* currently just calls the idle handler for one supervisor, skipping
* supervisors that are not already being called, and skipping supervisors for
Expand Down
4 changes: 4 additions & 0 deletions Source/Mothership/Mothership.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Mothership: public CommonBase
unsigned handle_msg_app_spec(PMsg_p* message);
unsigned handle_msg_app_dist(PMsg_p* message);
unsigned handle_msg_app_supd(PMsg_p* message);
unsigned handle_msg_cmnd_brkn(PMsg_p* message);
unsigned handle_msg_cmnd_recl(PMsg_p* message);
unsigned handle_msg_cmnd_init(PMsg_p* message);
unsigned handle_msg_cmnd_run(PMsg_p* message);
Expand Down Expand Up @@ -121,6 +122,9 @@ class Mothership: public CommonBase
bool provision_supervisor_api(std::string appName);
void supervisor_api_stop_application(std::string appName);

/* An app is broken! */
void tell_root_app_is_broken(std::string appName);

/* Supervisor spinning (virtual from CommonBase). */
void OnIdle();
};
Expand Down
Loading