diff --git a/codes/configuration.h b/codes/configuration.h index 13a7c03e..8cb769b8 100644 --- a/codes/configuration.h +++ b/codes/configuration.h @@ -225,6 +225,26 @@ int configuration_get_value_double (ConfigHandle *handle, const char * annotation, double *value); +/* + * Get's the values for a give section/key pair which has multiple values + * and converts them to an int. + * Assumes the key name is a MULTIKEY configuration type. + * + * handle - configuration handle + * section_name - name of the section the key is in + * key_name - name of the key + * annotation - optional annotation to look for (NULL -> no annotation) + * values - array of points to values (must be freed by caller) + * length - number of value items + */ +int configuration_get_multivalue_int(ConfigHandle *handle, + const char * section_name, + const char * key_name, + const char * annotation, + int **values, + size_t *length); + + /* * Get the LPGROUPS configuration from the config file which is stored * in the associated data structures. diff --git a/codes/model-net-sched.h b/codes/model-net-sched.h index 9f685b85..ff0297e3 100644 --- a/codes/model-net-sched.h +++ b/codes/model-net-sched.h @@ -30,6 +30,7 @@ typedef struct mn_sched_params_s mn_sched_params; X(MN_SCHED_FCFS_FULL, "fcfs-full", &fcfs_tab) \ X(MN_SCHED_RR, "round-robin", &rr_tab) \ X(MN_SCHED_PRIO, "priority", &prio_tab) \ + X(MN_SCHED_QOS, "qos", &qos_tab) \ X(MAX_SCHEDS, NULL, NULL) #define X(a,b,c) a, @@ -46,18 +47,29 @@ typedef struct model_net_sched_rc_s model_net_sched_rc; // priority scheduler configurtion parameters typedef struct mn_prio_params_s { int num_prios; // number of priorities - // sub-scheduler to use. can be any but prio + // sub-scheduler to use. can be any but prio or qos enum sched_type sub_stype; } mn_prio_params; +typedef struct mn_qos_params_s { + int *qos_table; + size_t numSLs; +} mn_qos_params; + // TODO: other scheduler config params // initialization parameter set +//typedef struct model_net_sched_cfg_params_s { +// enum sched_type type; +// union { +// mn_prio_params prio; +// } u; +//} model_net_sched_cfg_params; + typedef struct model_net_sched_cfg_params_s { - enum sched_type type; - union { - mn_prio_params prio; - } u; + enum sched_type type; + mn_prio_params prio; + mn_qos_params qos; } model_net_sched_cfg_params; typedef struct mn_sched_cfg_params { @@ -72,7 +84,7 @@ enum sched_msg_param_type { // scheduler-specific parameter definitions must go here struct mn_sched_params_s { - int prio; // MN_SCHED_PARAM_PRIO (currently the only one) + int prio; // used by prio and qos } ; /// interface to be implemented by schedulers @@ -141,7 +153,11 @@ struct model_net_sched_rc_s { model_net_request req; // request gets deleted... mn_sched_params sched_params; // along with msg params int rtn; // return code from a sched_next - int prio; // prio when doing priority queue events + int prio; // prio when doing priority queue or qos events + + // For QoS + size_t qos_table_index; + size_t qos_table_counter; }; // initialize the scheduler diff --git a/codes/net/fattree.h b/codes/net/fattree.h index fe483fd7..02d8fecc 100644 --- a/codes/net/fattree.h +++ b/codes/net/fattree.h @@ -81,6 +81,12 @@ struct fattree_message int remote_event_size_bytes; int local_event_size_bytes; + size_t service_level; + + // QoS params needed to revert operations + size_t qos_table_index; + size_t qos_table_counter; + }; #endif /* end of include guard: FATTREE_H */ diff --git a/configure.ac b/configure.ac index c74056a5..8ff352da 100755 --- a/configure.ac +++ b/configure.ac @@ -27,6 +27,7 @@ AM_PROG_CC_C_O AC_PROG_CXX AC_PROG_CXXCPP AC_PROG_RANLIB +AC_PROG_LIBTOOL PKG_PROG_PKG_CONFIG diff --git a/src/Makefile.subdir b/src/Makefile.subdir index 7a7d1a6d..a44c17ba 100644 --- a/src/Makefile.subdir +++ b/src/Makefile.subdir @@ -105,7 +105,7 @@ nobase_include_HEADERS = \ codes/net/simplep2p.h \ codes/net/express-mesh.h \ codes/net/torus.h \ - codes/codes-mpi-replay.h \ + codes/codes-mpi-replay.h \ codes/configfile.h @@ -174,7 +174,10 @@ src_libcodes_la_SOURCES = \ src/networks/model-net/simplep2p.c \ src/networks/model-net/core/model-net-lp.c \ src/networks/model-net/core/model-net-sched.c \ - src/networks/model-net/core/model-net-sched-impl.c + src/networks/model-net/core/model-net-sched-impl.c \ + src/networks/model-net/qos.h \ + src/networks/model-net/qos.c + src_libcodes_mpi_replay_la_SOURCES = \ src/network-workloads/model-net-mpi-replay.c diff --git a/src/modelconfig/configuration.c b/src/modelconfig/configuration.c index 831493fc..9abac925 100644 --- a/src/modelconfig/configuration.c +++ b/src/modelconfig/configuration.c @@ -279,6 +279,42 @@ int configuration_get_value_double (ConfigHandle *handle, return rc; } + +int configuration_get_multivalue_int(ConfigHandle *handle, + const char *section_name, + const char *key_name, + const char *annotation, + int **values, + size_t *len) { + char **valuestr; + int rc = 1; + int r; + + r = configuration_get_multivalue(handle, + section_name, + key_name, + annotation, + &valuestr, + len); + if (r > 0) + { + int *vals = malloc(sizeof(int) * (*len)); + //printf("Vals: %d\n", vals); + //printf("Len: %d\n", *len); + for (size_t i = 0; i < *len; ++i) + { + //printf("%s\n", valuestr[i]); + vals[i] = atoi(valuestr[i]); + //printf("%d\n", vals[i]); + } + *values = vals; + rc = 0; + } + + return rc; +} + + static void check_add_anno( int anno_offset, config_anno_map_t *map){ diff --git a/src/networks/model-net/core/model-net-lp.c b/src/networks/model-net/core/model-net-lp.c index 6b22bdd5..aec6119a 100644 --- a/src/networks/model-net/core/model-net-lp.c +++ b/src/networks/model-net/core/model-net-lp.c @@ -261,7 +261,7 @@ static void base_read_config(const char * anno, model_net_base_params *p){ char sched[MAX_NAME_LENGTH]; long int packet_size_l = 0; uint64_t packet_size; - int ret; + int ret, is_prio; ret = configuration_get_value(&config, "PARAMS", "modelnet_scheduler", anno, sched, MAX_NAME_LENGTH); @@ -311,11 +311,28 @@ static void base_read_config(const char * anno, model_net_base_params *p){ "setting to %d\n", p->node_copy_queues); } + is_prio = p->sched_params.type == MN_SCHED_PRIO || p->sched_params.type == MN_SCHED_QOS; + // get scheduler-specific parameters - if (p->sched_params.type == MN_SCHED_PRIO){ - // prio scheduler uses default parameters - int * num_prios = &p->sched_params.u.prio.num_prios; - enum sched_type * sub_stype = &p->sched_params.u.prio.sub_stype; + if (p->sched_params.type == MN_SCHED_QOS) { + mn_qos_params *qos = &p->sched_params.qos; + ret = configuration_get_multivalue_int(&config, "PARAMS", "qos_table", + anno, &qos->qos_table, &qos->numSLs); + if (ret != 0) { + qos->numSLs = 1; + qos->qos_table = malloc(sizeof(int)); + qos->qos_table[0] = 100; + } + + // QoS uses the prio scheduler, so set those params as well + p->sched_params.prio.num_prios = qos->numSLs; + p->sched_params.prio.sub_stype = MN_SCHED_RR; + } + + if (p->sched_params.type == MN_SCHED_PRIO) { + int * num_prios = &p->sched_params.prio.num_prios; + enum sched_type * sub_stype = &p->sched_params.prio.sub_stype; + // number of priorities to allocate ret = configuration_get_value_int(&config, "PARAMS", "prio-sched-num-prios", anno, num_prios); @@ -338,7 +355,7 @@ static void base_read_config(const char * anno, model_net_base_params *p){ tw_error(TW_LOC, "Unknown value for " "PARAMS:prio-sched-sub-sched %s", sched); } - else if (i == MN_SCHED_PRIO){ + else if (i == MN_SCHED_PRIO || i == MN_SCHED_QOS){ tw_error(TW_LOC, "priority scheduler cannot be used as a " "priority scheduler's sub sched " "(PARAMS:prio-sched-sub-sched)"); @@ -347,16 +364,14 @@ static void base_read_config(const char * anno, model_net_base_params *p){ } if (p->sched_params.type == MN_SCHED_FCFS_FULL || - (p->sched_params.type == MN_SCHED_PRIO && - p->sched_params.u.prio.sub_stype == MN_SCHED_FCFS_FULL)){ + (is_prio && p->sched_params.prio.sub_stype == MN_SCHED_FCFS_FULL)){ // override packet size to something huge (leave a bit in the unlikely // case that an op using packet size causes overflow) packet_size = 1ull << 62; } else if (!packet_size && (p->sched_params.type != MN_SCHED_FCFS_FULL || - (p->sched_params.type == MN_SCHED_PRIO && - p->sched_params.u.prio.sub_stype != MN_SCHED_FCFS_FULL))){ + (is_prio && p->sched_params.prio.sub_stype != MN_SCHED_FCFS_FULL))){ packet_size = 512; fprintf(stderr, "WARNING, no packet size specified, setting packet " "size to %llu\n", LLU(packet_size)); diff --git a/src/networks/model-net/core/model-net-sched-impl.c b/src/networks/model-net/core/model-net-sched-impl.c index 7bcf4d16..b8c14799 100644 --- a/src/networks/model-net/core/model-net-sched-impl.c +++ b/src/networks/model-net/core/model-net-sched-impl.c @@ -8,6 +8,7 @@ #include #include +#include "../qos.h" #include #include #include @@ -51,6 +52,15 @@ typedef struct mn_sched_prio { mn_sched_queue ** sub_scheds; // one for each params.num_prios } mn_sched_prio; +// QoS scheduler builds on the prio scheduler +typedef struct mn_sched_qos { + mn_sched_prio *prio; + size_t numSLs; + int * qos_table; + size_t qos_table_index; + size_t qos_table_counter; +} mn_sched_qos; + /// scheduler-specific function decls and tables /// FCFS @@ -141,6 +151,34 @@ static void prio_next_rc ( const void * rc_event_save, const model_net_sched_rc * rc, tw_lp * lp); +static void qos_init ( + const struct model_net_method * method, + const model_net_sched_cfg_params * params, + int is_recv_queue, + void ** sched); +static void qos_destroy (void *sched); +static void qos_add ( + const model_net_request * req, + const mn_sched_params * sched_params, + int remote_event_size, + void * remote_event, + int local_event_size, + void * local_event, + void * sched, + model_net_sched_rc * rc, + tw_lp * lp); +static void qos_add_rc(void *sched, const model_net_sched_rc *rc, tw_lp *lp); +static int qos_next( + tw_stime * poffset, + void * sched, + void * rc_event_save, + model_net_sched_rc * rc, + tw_lp * lp); +static void qos_next_rc ( + void * sched, + const void * rc_event_save, + const model_net_sched_rc * rc, + tw_lp * lp); /// function tables (names defined by X macro in model-net-sched.h) static const model_net_sched_interface fcfs_tab = @@ -149,6 +187,8 @@ static const model_net_sched_interface rr_tab = { &rr_init, &rr_destroy, &rr_add, &rr_add_rc, &rr_next, &rr_next_rc}; static const model_net_sched_interface prio_tab = { &prio_init, &prio_destroy, &prio_add, &prio_add_rc, &prio_next, &prio_next_rc}; +static const model_net_sched_interface qos_tab = +{ &qos_init, &qos_destroy, &qos_add, &qos_add_rc, &qos_next, &qos_next_rc}; #define X(a,b,c) c, const model_net_sched_interface * sched_interfaces[] = { @@ -427,13 +467,13 @@ void rr_next_rc ( } void prio_init ( - const struct model_net_method * method, + const struct model_net_method * method, const model_net_sched_cfg_params * params, int is_recv_queue, void ** sched){ *sched = malloc(sizeof(mn_sched_prio)); mn_sched_prio *ss = *sched; - ss->params = params->u.prio; + ss->params = params->prio; ss->sub_scheds = malloc(ss->params.num_prios*sizeof(mn_sched_queue*)); ss->sub_sched_iface = sched_interfaces[ss->params.sub_stype]; for (int i = 0; i < ss->params.num_prios; i++){ @@ -521,6 +561,89 @@ void prio_next_rc ( // else, no-op } +void qos_init ( + const struct model_net_method * method, + const model_net_sched_cfg_params * params, + int is_recv_queue, + void ** sched){ + *sched = malloc(sizeof(mn_sched_qos)); + mn_sched_qos *ss = *sched; + ss->numSLs = params->qos.numSLs; + ss->qos_table = params->qos.qos_table; + ss->qos_table_index = 0; + ss->qos_table_counter = 0; + prio_init(method, params, is_recv_queue, (void **)&ss->prio); +} + +void qos_destroy (void *sched){ + mn_sched_qos *ss = sched; + prio_destroy((void *)ss->prio); + free(ss->qos_table); + free(ss); +} + +void qos_add ( + const model_net_request * req, + const mn_sched_params * sched_params, + int remote_event_size, + void * remote_event, + int local_event_size, + void * local_event, + void * sched, + model_net_sched_rc * rc, + tw_lp * lp){ + mn_sched_qos *ss = sched; + prio_add(req, sched_params, remote_event_size, remote_event, local_event_size, local_event, (void *)ss->prio, rc, lp); +} + +void qos_add_rc(void * sched, const model_net_sched_rc *rc, tw_lp *lp){ + mn_sched_qos *ss = sched; + prio_add_rc((void *)ss->prio, rc, lp); +} + +static int qos_has_packets(void * sched, size_t sl) { + mn_sched_qos *ss = sched; + return !qlist_empty(&ss->prio->sub_scheds[sl]->reqs); +} + +int qos_next( + tw_stime * poffset, + void * sched, + void * rc_event_save, + model_net_sched_rc * rc, + tw_lp * lp){ + mn_sched_qos *ss = sched; + + // Save parameters we need to reverse the operation + rc->qos_table_index = ss->qos_table_index; + rc->qos_table_counter = ss->qos_table_counter; + + rc->prio = get_next_sl(ss->numSLs, ss->qos_table, &ss->qos_table_index, &ss->qos_table_counter, qos_has_packets, ss); + if (rc->prio != NO_PACKETS_TO_SEND) { + return ss->prio->sub_sched_iface->next( + poffset, ss->prio->sub_scheds[rc->prio], rc_event_save, rc, lp); + } + else { + // No packets were available to send + rc->prio = -1; + return -1; + } +} + +void qos_next_rc ( + void * sched, + const void * rc_event_save, + const model_net_sched_rc * rc, + tw_lp * lp){ + mn_sched_qos *ss = sched; + prio_next_rc((void *)ss->prio, rc_event_save, rc, lp); + if (rc->prio > -1) { + ss->qos_table_index = rc->qos_table_index; + ss->qos_table_counter = rc->qos_table_counter; + } +} + + /* * Local variables: * c-indent-level: 4 diff --git a/src/networks/model-net/fattree.c b/src/networks/model-net/fattree.c index eb1c49b5..fca0514f 100644 --- a/src/networks/model-net/fattree.c +++ b/src/networks/model-net/fattree.c @@ -10,6 +10,7 @@ #include "sys/file.h" #include "codes/quickhash.h" #include "codes/rc-stack.h" +#include "qos.h" #include #include @@ -161,6 +162,12 @@ struct fattree_param int rail_size_limit; int num_rails; int ports_per_nic; + + // QoS + int use_qos; + size_t num_vcs; /* number of virtual channels */ + int * qos_table; + }; struct ftree_hash_key @@ -275,13 +282,13 @@ struct switch_state tw_stime* busy_time; tw_stime* busy_time_sample; - fattree_message_list **pending_msgs; - fattree_message_list **pending_msgs_tail; - fattree_message_list **queued_msgs; - fattree_message_list **queued_msgs_tail; + fattree_message_list ***pending_msgs; + fattree_message_list ***pending_msgs_tail; + fattree_message_list ***queued_msgs; + fattree_message_list ***queued_msgs_tail; int *queued_length; int *in_send_loop; - int* vc_occupancy; + int** vc_occupancy; int64_t* link_traffic; tw_lpid *port_connections; @@ -294,6 +301,11 @@ struct switch_state fattree_param *params; /* array to store linear forwaring tables in case we use static routing */ int *lft; + + // QoS + size_t *qos_table_index; + size_t *qos_table_counter; + }; /* ROSS Instrumentation Support */ @@ -348,6 +360,17 @@ static long long total_msg_sz = 0; static long long N_finished_msgs = 0; static long long N_finished_chunks = 0; +static int switch_queue_has_packets(void *, size_t); +static int switch_queue_has_packets_all_sls(fattree_message_list **, size_t); + +#ifdef PACKET_PRINT +static int getRank() { + int rank; + MPI_Comm_rank(MPI_COMM_CODES, &rank); + return rank; +} +#endif + static int fattree_rank_hash_compare( void *key, struct qhash_head *link) { @@ -757,6 +780,25 @@ static void fattree_read_config(const char * anno, fattree_param *p){ p->num_switches = (int *) malloc (p->num_levels * sizeof(int)); p->switch_radix = (int*) malloc (p->num_levels * sizeof(int)); + // Load QoS paramters. We share some of these parameters with model-net + char sched[MAX_NAME_LENGTH]; + int ret = configuration_get_value(&config, "PARAMS", "modelnet_scheduler", + anno, sched, MAX_NAME_LENGTH); + p->use_qos = ret > 0 && strcmp(sched, "qos") == 0; + if (p->use_qos) { + ret = configuration_get_multivalue_int(&config, "PARAMS", "ft_qos_table", + anno, &p->qos_table, &p->num_vcs); + if (ret != 0) { + p->use_qos = 0; + } + } + if (!p->use_qos) { + // Put in default values + p->num_vcs = 1; + p->qos_table = malloc(sizeof(int)); + p->qos_table[0] = 100; + } + char switch_counts_str[MAX_NAME_LENGTH]; int rc = configuration_get_value(&config, "PARAMS", "switch_count", anno, switch_counts_str, MAX_NAME_LENGTH); @@ -851,7 +893,7 @@ static void fattree_read_config(const char * anno, fattree_param *p){ } } - for(int jj=0;jj<3 && !g_tw_mynode;jj++) + for(int jj=0;jjnum_levels && !g_tw_mynode;jj++) { printf("num_switches[%d]=%d\n",jj,p->num_switches[jj]); } @@ -1154,18 +1196,20 @@ void switch_init(switch_state * r, tw_lp * lp) r->next_output_available_time = (tw_stime*) malloc (r->radix * sizeof(tw_stime)); - r->vc_occupancy = (int*) malloc (r->radix * sizeof(int)); + r->vc_occupancy = (int**) malloc (r->radix * sizeof(int*)); r->in_send_loop = (int*) malloc (r->radix * sizeof(int)); r->link_traffic = (int64_t*) malloc (r->radix * sizeof(int64_t)); r->port_connections = (tw_lpid*) malloc (r->radix * sizeof(tw_lpid)); + r->pending_msgs = - (fattree_message_list**)malloc(r->radix * sizeof(fattree_message_list*)); + (fattree_message_list***)malloc(r->radix * sizeof(fattree_message_list**)); r->pending_msgs_tail = - (fattree_message_list**)malloc(r->radix * sizeof(fattree_message_list*)); + (fattree_message_list***)malloc(r->radix * sizeof(fattree_message_list**)); r->queued_msgs = - (fattree_message_list**)malloc(r->radix * sizeof(fattree_message_list*)); + (fattree_message_list***)malloc(r->radix * sizeof(fattree_message_list**)); r->queued_msgs_tail = - (fattree_message_list**)malloc(r->radix * sizeof(fattree_message_list*)); + (fattree_message_list***)malloc(r->radix * sizeof(fattree_message_list**)); + r->queued_length = (int*)malloc(r->radix * sizeof(int)); r->lft = NULL; @@ -1173,6 +1217,9 @@ void switch_init(switch_state * r, tw_lp * lp) r->busy_time = (tw_stime*)malloc(r->radix * sizeof(tw_stime)); r->busy_time_sample = (tw_stime*)malloc(r->radix * sizeof(tw_stime)); + r->qos_table_index = malloc(r->radix * sizeof(size_t)); + r->qos_table_counter = malloc(r->radix * sizeof(size_t)); + // ROSS Instrumentation if (g_st_use_analysis_lps && g_st_model_stats) lp->model_types->sample_struct_sz = sizeof(struct fattree_switch_sample) + sizeof(int) * r->radix; @@ -1186,14 +1233,23 @@ void switch_init(switch_state * r, tw_lp * lp) r->busy_time[i] = 0.0; r->busy_time_sample[i] = 0.0; r->next_output_available_time[i] = 0; - r->vc_occupancy[i] = 0; + r->vc_occupancy[i] = malloc(p->num_vcs * sizeof(int)); r->in_send_loop[i] = 0; r->link_traffic[i] = 0; - r->pending_msgs[i] = NULL; - r->pending_msgs_tail[i] = NULL; - r->queued_msgs[i] = NULL; - r->queued_msgs_tail[i] = NULL; + r->pending_msgs[i] = malloc(p->num_vcs * sizeof(fattree_message_list*)); + r->pending_msgs_tail[i] = malloc(p->num_vcs * sizeof(fattree_message_list*)); + r->queued_msgs[i] = malloc(p->num_vcs * sizeof(fattree_message_list*)); + r->queued_msgs_tail[i] = malloc(p->num_vcs * sizeof(fattree_message_list*)); + for (size_t vc = 0; vc < p->num_vcs; vc++) { + r->vc_occupancy[i][vc] = 0; + r->pending_msgs[i][vc] = NULL; + r->pending_msgs_tail[i][vc] = NULL; + r->queued_msgs[i][vc] = NULL; + r->queued_msgs_tail[i][vc] = NULL; + } r->queued_length[i] = 0; + r->qos_table_index[i] = 0; + r->qos_table_counter[i] = 0; } /* dump partial topology info into DOT format (switch radix, guid, ...) */ @@ -1560,6 +1616,7 @@ static tw_stime fattree_packet_event( msg->pull_size = req->pull_size; msg->magic = fattree_terminal_magic_num; msg->msg_start_time = req->msg_start_time; + msg->service_level = sched_params->prio; /* Its the last packet so pass in remote and local event information*/ if(is_last_pckt) @@ -1652,6 +1709,11 @@ void ft_packet_generate(ft_terminal_state * s, tw_bf * bf, fattree_message * msg nic_ts = g_tw_lookahead + (msg->packet_size * s->params->cn_delay) + tw_rand_unif(lp->rng); + // Make sure the service level is valid + if (!p->use_qos || msg->service_level >= p->num_vcs) { + msg->service_level = 0; + } + msg->my_N_hop = 0; msg->packet_ID = lp->gid + g_tw_nlp * s->packet_counter; @@ -1695,6 +1757,7 @@ void ft_packet_generate(ft_terminal_state * s, tw_bf * bf, fattree_message * msg cur_chunk->msg.chunk_id = i; cur_chunk->msg.origin_switch_id = s->switch_id; + cur_chunk->msg.service_level = msg->service_level; append_to_fattree_message_list(s->terminal_msgs, s->terminal_msgs_tail, target_queue, cur_chunk); s->terminal_length[target_queue] += s->params->chunk_size; @@ -1718,6 +1781,7 @@ void ft_packet_generate(ft_terminal_state * s, tw_bf * bf, fattree_message * msg m->type = T_SEND; m->vc_index = target_queue; m->magic = fattree_terminal_magic_num; + m->service_level = msg->service_level; s->in_send_loop[target_queue] = 1; tw_event_send(e); } @@ -1755,6 +1819,10 @@ void ft_packet_send_rc(ft_terminal_state * s, tw_bf *bf, fattree_message * msg, s->vc_occupancy[msg->vc_index] -= s->params->chunk_size; fattree_message_list* cur_entry = rc_stack_pop(s->st); +#ifdef PACKET_PRINT + printf("%d: %f terminal send reverse packet %llu node %d SL %d src %d dest %d\n", getRank(), tw_now(lp), cur_entry->msg.packet_ID, s->terminal_id, cur_entry->msg.service_level, codes_mapping_get_lp_relative_id(cur_entry->msg.src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(cur_entry->msg.dest_terminal_id, 0, 0)); + fflush(stdout); +#endif prepend_to_fattree_message_list(s->terminal_msgs, s->terminal_msgs_tail, msg->vc_index, cur_entry); @@ -1855,6 +1923,11 @@ void ft_packet_send(ft_terminal_state * s, tw_bf * bf, fattree_message * msg, tw_event_send(e_new); } +#ifdef PACKET_PRINT + printf("%d: %f terminal send packet %llu node %d SL %d src %d dest %d\n", getRank(), tw_now(lp), cur_entry->msg.packet_ID, s->terminal_id, cur_entry->msg.service_level, codes_mapping_get_lp_relative_id(cur_entry->msg.src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(cur_entry->msg.dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + s->packet_counter++; s->vc_occupancy[msg->vc_index] += s->params->chunk_size; cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, msg->vc_index); @@ -1866,21 +1939,15 @@ void ft_packet_send(ft_terminal_state * s, tw_bf * bf, fattree_message * msg, cur_entry = s->terminal_msgs[msg->vc_index]; - if(cur_entry != NULL && - s->vc_occupancy[msg->vc_index] + s->params->chunk_size <= s->params->cn_vc_size) { - bf->c3 = 1; - fattree_message *m_new; - ts = ts + g_tw_lookahead * tw_rand_unif(lp->rng); - e = model_net_method_event_new(lp->gid, ts, lp, FATTREE, - (void**)&m_new, NULL); - m_new->type = T_SEND; - m_new->vc_index = msg->vc_index; - m_new->magic = fattree_terminal_magic_num; - tw_event_send(e); - } else { - bf->c4 = 1; - s->in_send_loop[msg->vc_index] = 0; - } + bf->c3 = 1; + fattree_message *m_new; + ts = ts + g_tw_lookahead * tw_rand_unif(lp->rng); + e = model_net_method_event_new(lp->gid, ts, lp, FATTREE, + (void**)&m_new, NULL); + m_new->type = T_SEND; + m_new->vc_index = msg->vc_index; + m_new->magic = fattree_terminal_magic_num; + tw_event_send(e); if(s->issueIdle[msg->vc_index]) { bf->c5 = 1; @@ -1900,15 +1967,22 @@ void switch_packet_receive_rc(switch_state * s, s_arrive_r++; #endif int output_port = msg->saved_vc; + int use_vc = msg->service_level; +#ifdef PACKET_PRINT + printf("%d: %f switch recv reverse packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, output_port, use_vc, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif if(s->params->routing != STATIC) { tw_rand_reverse_unif(lp->rng); } if(bf->c1) { tw_rand_reverse_unif(lp->rng); - delete_fattree_message_list(return_tail(s->pending_msgs, - s->pending_msgs_tail, output_port)); - s->vc_occupancy[output_port] -= s->params->chunk_size; + + delete_fattree_message_list(return_tail(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], use_vc)); + s->vc_occupancy[output_port][use_vc] -= s->params->chunk_size; + if(bf->c2) { codes_local_latency_reverse(lp); @@ -1917,8 +1991,8 @@ void switch_packet_receive_rc(switch_state * s, } if(bf->c3) { - delete_fattree_message_list(return_tail(s->queued_msgs, - s->queued_msgs_tail, output_port)); + delete_fattree_message_list(return_tail(s->queued_msgs[output_port], + s->queued_msgs_tail[output_port], use_vc)); s->queued_length[output_port] -= s->params->chunk_size; s->last_buf_full[output_port] = msg->saved_busy_time; } @@ -1965,13 +2039,19 @@ void switch_packet_receive( switch_state * s, tw_bf * bf, cur_chunk->msg.vc_index = output_port; cur_chunk->msg.vc_off = out_off; cur_chunk->msg.my_N_hop++; + cur_chunk->msg.service_level = msg->service_level; + + int use_vc = msg->service_level; + + if(s->vc_occupancy[output_port][use_vc] + s->params->chunk_size <= max_vc_size) { - if(s->vc_occupancy[output_port] + s->params->chunk_size <= max_vc_size) { bf->c1 = 1; switch_credit_send(s, bf, msg, lp, -1); - append_to_fattree_message_list( s->pending_msgs, s->pending_msgs_tail, - output_port, cur_chunk); - s->vc_occupancy[output_port] += s->params->chunk_size; + + append_to_fattree_message_list( s->pending_msgs[output_port], s->pending_msgs_tail[output_port], + use_vc, cur_chunk); + s->vc_occupancy[output_port][use_vc] += s->params->chunk_size; + if(s->in_send_loop[output_port] == 0) { bf->c2 = 1; fattree_message *m; @@ -1981,6 +2061,7 @@ void switch_packet_receive( switch_state * s, tw_bf * bf, m->type = S_SEND; m->magic = switch_magic_num; m->vc_index = output_port; + m->service_level = msg->service_level; //printf("[%d] pack recv Send to %d\n", lp->gid, lp->gid); tw_event_send(e); s->in_send_loop[output_port] = 1; @@ -1989,8 +2070,8 @@ void switch_packet_receive( switch_state * s, tw_bf * bf, bf->c3 = 1; cur_chunk->msg.saved_vc = msg->vc_index; cur_chunk->msg.saved_off = msg->vc_off; - append_to_fattree_message_list( s->queued_msgs, s->queued_msgs_tail, - output_port, cur_chunk); + append_to_fattree_message_list( s->queued_msgs[output_port], s->queued_msgs_tail[output_port], + use_vc, cur_chunk); s->queued_length[output_port] += s->params->chunk_size; msg->saved_busy_time = s->last_buf_full[output_port]; s->last_buf_full[output_port] = tw_now(lp); @@ -1998,6 +2079,11 @@ void switch_packet_receive( switch_state * s, tw_bf * bf, //for reverse msg->saved_vc = output_port; +#ifdef PACKET_PRINT + printf("%d: %f switch recv packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, output_port, use_vc, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + return; } @@ -2009,6 +2095,7 @@ void switch_packet_send_rc(switch_state * s, s_send_r++; #endif int output_port = msg->saved_vc; + int use_vc = msg->service_level; if(bf->c1) { s->in_send_loop[output_port] = 1; @@ -2020,6 +2107,11 @@ void switch_packet_send_rc(switch_state * s, fattree_message_list * cur_entry = rc_stack_pop(s->st); assert(cur_entry); +#ifdef PACKET_PRINT + printf("%d: %f switch send reverse packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), cur_entry->msg.packet_ID, s->switch_id, output_port, use_vc, codes_mapping_get_lp_relative_id(cur_entry->msg.src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(cur_entry->msg.dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + if(bf->c11) { s->link_traffic[output_port] -= (cur_entry->msg.packet_size % s->params->chunk_size); @@ -2029,8 +2121,8 @@ void switch_packet_send_rc(switch_state * s, s->link_traffic[output_port] -= s->params->chunk_size; } - prepend_to_fattree_message_list(s->pending_msgs, - s->pending_msgs_tail, output_port, cur_entry); + prepend_to_fattree_message_list(s->pending_msgs[output_port], + s->pending_msgs_tail[output_port], use_vc, cur_entry); if(bf->c3) { @@ -2040,7 +2132,30 @@ void switch_packet_send_rc(switch_state * s, { s->in_send_loop[output_port] = 1; } + + if (s->params->use_qos) { + s->qos_table_index[output_port] = msg->qos_table_index; + s->qos_table_counter[output_port] = msg->qos_table_counter; + } + } + +static int switch_queue_has_packets_all_sls(fattree_message_list ** pending_msgs, size_t num_sls) { + int i; + int more_msgs = 0; + for (i = 0; i < num_sls; ++i) { + if(pending_msgs[i] != NULL) { + return 1; + } + } + return 0; +} + +static int switch_queue_has_packets(void * data, size_t sl) { + fattree_message_list ** pending_msgs = data; + return pending_msgs[sl] != NULL; +} + /* routes the current packet to the next stop */ void switch_packet_send( switch_state * s, tw_bf * bf, fattree_message * msg, tw_lp * lp) { @@ -2054,8 +2169,19 @@ void switch_packet_send( switch_state * s, tw_bf * bf, fattree_message * msg, fattree_message *m; int output_port = msg->vc_index; - fattree_message_list *cur_entry = s->pending_msgs[output_port]; + int use_vc = 0; + if (s->params->use_qos) { + // Save QoS parameters needed to revert the operation + msg->qos_table_index = s->qos_table_index[output_port]; + msg->qos_table_counter = s->qos_table_counter[output_port]; + + use_vc = get_next_sl(s->params->num_vcs, s->params->qos_table, &s->qos_table_index[output_port], &s->qos_table_counter[output_port], switch_queue_has_packets, s->pending_msgs[output_port]); + if (use_vc == NO_PACKETS_TO_SEND) use_vc = 0; + } + + fattree_message_list *cur_entry = s->pending_msgs[output_port][use_vc]; msg->saved_vc = output_port; + msg->service_level = use_vc; if(cur_entry == NULL) { bf->c1 = 1; @@ -2134,29 +2260,30 @@ void switch_packet_send( switch_state * s, tw_bf * bf, fattree_message * msg, } tw_event_send(e); - cur_entry = return_head(s->pending_msgs, s->pending_msgs_tail, - output_port); +#ifdef PACKET_PRINT + printf("%d: %f switch send packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), cur_entry->msg.packet_ID, s->switch_id, output_port, use_vc, codes_mapping_get_lp_relative_id(cur_entry->msg.src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(cur_entry->msg.dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + + cur_entry = return_head(s->pending_msgs[output_port], s->pending_msgs_tail[output_port], + use_vc); + rc_stack_push(lp, cur_entry, (void*)delete_fattree_message_list, s->st); s->next_output_available_time[output_port] -= s->params->router_delay; ts -= s->params->router_delay; - cur_entry = s->pending_msgs[output_port]; - if(cur_entry != NULL) { - bf->c3 = 1; - fattree_message *m_new; - ts = ts + g_tw_lookahead * tw_rand_unif(lp->rng); - e = tw_event_new(lp->gid, ts, lp); - m_new = tw_event_data(e); - m_new->type = S_SEND; - m_new->magic = switch_magic_num; - m_new->vc_index = output_port; - //printf("[%d] switch send loop Send to %d\n", lp->gid, lp->gid); - tw_event_send(e); - } else { - bf->c4 = 1; - s->in_send_loop[output_port] = 0; - } + bf->c3 = 1; + fattree_message *m_new; + ts = ts + g_tw_lookahead * tw_rand_unif(lp->rng); + e = tw_event_new(lp->gid, ts, lp); + m_new = tw_event_data(e); + m_new->type = S_SEND; + m_new->magic = switch_magic_num; + m_new->vc_index = output_port; + //printf("[%d] switch send loop Send to %d\n", lp->gid, lp->gid); + tw_event_send(e); + return; } @@ -2196,10 +2323,12 @@ void switch_credit_send(switch_state * s, tw_bf * bf, fattree_message * msg, if (is_terminal) { buf_e = model_net_method_event_new(dest, ts, lp, FATTREE, (void**)&buf_msg, NULL); - buf_msg->magic = fattree_terminal_magic_num; + memcpy(buf_msg, msg, sizeof(fattree_message)); + buf_msg->magic = fattree_terminal_magic_num; } else { buf_e = tw_event_new(dest, ts , lp); buf_msg = tw_event_data(buf_e); + memcpy(buf_msg, msg, sizeof(fattree_message)); buf_msg->magic = switch_magic_num; } @@ -2211,6 +2340,14 @@ void switch_credit_send(switch_state * s, tw_bf * bf, fattree_message * msg, buf_msg->vc_index = msg->vc_index; //the port src used to send me this data } + buf_msg->service_level = msg->service_level; + buf_msg->packet_ID = msg->packet_ID; + +#ifdef PACKET_PRINT + printf("%d: %f send credit packet %llu switch %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, buf_msg->service_level, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + //printf("[%d] credit send Send to %d\n", lp->gid, dest); tw_event_send(buf_e); return; @@ -2223,6 +2360,10 @@ void ft_terminal_buf_update_rc(ft_terminal_state * s, tw_bf * bf, fattree_messag #endif s->vc_occupancy[msg->vc_index] += s->params->chunk_size; codes_local_latency_reverse(lp); +#ifdef PACKET_PRINT + printf("%d: %f terminal receive credit reverse packet %llu node %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->terminal_id, msg->service_level, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif if(bf->c3) { s->busy_time[msg->vc_index] = msg->saved_total_time; @@ -2267,6 +2408,12 @@ void ft_terminal_buf_update(ft_terminal_state * s, tw_bf * bf, //printf("[%d] term buf Send to %d\n", lp->gid, lp->gid); tw_event_send(e); } + +#ifdef PACKET_PRINT + printf("%d: %f terminal receive credit packet %llu node %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->terminal_id, msg->service_level, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + return; } @@ -2279,7 +2426,13 @@ void switch_buf_update_rc(switch_state * s, s_buffer_r++; #endif int indx = msg->vc_index; - s->vc_occupancy[indx] += s->params->chunk_size; + int use_vc = msg->service_level; + s->vc_occupancy[indx][use_vc] += s->params->chunk_size; + +#ifdef PACKET_PRINT + printf("%d: %f switch receive credit reverse packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, indx, use_vc, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif if(bf->c3) { @@ -2289,12 +2442,12 @@ void switch_buf_update_rc(switch_state * s, } if(bf->c1) { - fattree_message_list* head = return_tail(s->pending_msgs, - s->pending_msgs_tail, indx); + fattree_message_list* head = return_tail(s->pending_msgs[indx], + s->pending_msgs_tail[indx], use_vc); tw_rand_reverse_unif(lp->rng); - prepend_to_fattree_message_list(s->queued_msgs, - s->queued_msgs_tail, indx, head); - s->vc_occupancy[indx] -= s->params->chunk_size; + prepend_to_fattree_message_list(s->queued_msgs[indx], + s->queued_msgs_tail[indx], use_vc, head); + s->vc_occupancy[indx][use_vc] -= s->params->chunk_size; s->queued_length[indx] += s->params->chunk_size; } if(bf->c2) @@ -2312,7 +2465,8 @@ void switch_buf_update(switch_state * s, tw_bf * bf, fattree_message * msg, bf->c3 = 0; int indx = msg->vc_index; - s->vc_occupancy[indx] -= s->params->chunk_size; + int use_vc = msg->service_level; + s->vc_occupancy[indx][use_vc] -= s->params->chunk_size; if(s->last_buf_full[indx]) { @@ -2325,18 +2479,23 @@ void switch_buf_update(switch_state * s, tw_bf * bf, fattree_message * msg, s->last_buf_full[indx] = 0.0; } - if(s->queued_msgs[indx] != NULL) { + if(s->queued_msgs[indx][use_vc]!= NULL) { bf->c1 = 1; - fattree_message_list *head = return_head( s->queued_msgs, - s->queued_msgs_tail, indx); + fattree_message_list *head = return_head( s->queued_msgs[indx], + s->queued_msgs_tail[indx], use_vc); s->queued_length[indx] -= s->params->chunk_size; switch_credit_send( s, bf, &head->msg, lp, 1); - append_to_fattree_message_list( s->pending_msgs, s->pending_msgs_tail, - indx, head); - s->vc_occupancy[indx] += s->params->chunk_size; + append_to_fattree_message_list( s->pending_msgs[indx], s->pending_msgs_tail[indx], + use_vc, head); + s->vc_occupancy[indx][use_vc] += s->params->chunk_size; } - if(s->in_send_loop[indx] == 0 && s->pending_msgs[indx] != NULL) { +#ifdef PACKET_PRINT + printf("%d: %f switch receive credit packet %llu switch %d port %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, indx, use_vc, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + + if(s->in_send_loop[indx] == 0 && switch_queue_has_packets_all_sls(s->pending_msgs[indx], s->params->num_vcs)) { bf->c2 = 1; fattree_message *m; tw_stime ts = codes_local_latency(lp); @@ -2423,6 +2582,11 @@ void ft_packet_arrive_rc(ft_terminal_state * s, tw_bf * bf, fattree_message * ms stat = model_net_find_stats(msg->category, s->fattree_stats_array); stat->recv_time = msg->saved_rcv_time; +#ifdef PACKET_PRINT + printf("%d: %f terminal recv reverse packet %llu switch %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, msg->service_level, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + if(bf->c1) { N_finished_packets--; @@ -2497,14 +2661,21 @@ void ft_packet_arrive(ft_terminal_state * s, tw_bf * bf, fattree_message * msg, fattree_message * buf_msg; tw_stime ts; +#ifdef PACKET_PRINT + printf("%d: %f terminal recv packet %llu switch %d SL %d src %d dest %d\n", getRank(), tw_now(lp), msg->packet_ID, s->switch_id, msg->service_level, codes_mapping_get_lp_relative_id(msg->src_terminal_id, 0, 0), codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0)); + fflush(stdout); +#endif + ts = g_tw_lookahead + s->params->credit_delay + g_tw_lookahead * tw_rand_unif(lp->rng); // no method_event here - message going to switch buf_e = tw_event_new(s->switch_lp[msg->rail_id], ts, lp); buf_msg = tw_event_data(buf_e); + memcpy(buf_msg, msg, sizeof(fattree_message)); buf_msg->magic = switch_magic_num; buf_msg->vc_index = msg->vc_index; buf_msg->type = S_BUFFER; + buf_msg->service_level = msg->service_level; //printf("[%d] pack arrive credit Send to %d\n", lp->gid, s->switch_lp); tw_event_send(buf_e); @@ -2671,6 +2842,7 @@ int ft_get_output_port( switch_state * s, tw_bf * bf, fattree_message * msg, (void)lp; int outport = -1; int start_port, end_port; + int use_vc = 0; fattree_param *p = s->params; int dest_term_local_id = codes_mapping_get_lp_relative_id(msg->dest_terminal_id, 0, 0); @@ -2721,12 +2893,12 @@ int ft_get_output_port( switch_state * s, tw_bf * bf, fattree_message * msg, //outport = start_port; // when occupancy is same, just choose random port outport = tw_rand_integer(lp->rng, start_port, end_port-1); - int load = s->vc_occupancy[outport] + s->queued_length[outport]; + int load = s->vc_occupancy[outport][use_vc] + s->queued_length[outport]; if(load != 0) { //for(int port = start_port + 1; port < end_port; port++) { for(int port = start_port; port < end_port; port++) { - if(s->vc_occupancy[port] + s->queued_length[port] < load) { - load = s->vc_occupancy[port] + s->queued_length[port]; + if(s->vc_occupancy[port][use_vc] + s->queued_length[port] < load) { + load = s->vc_occupancy[port][use_vc] + s->queued_length[port]; outport = port; if(load <= 0) break; } @@ -2861,13 +3033,16 @@ void fattree_switch_final(switch_state * s, tw_lp * lp) if(dump_topo) return; (void)lp; - int i; + int i; + size_t vc; for(i = 0; i < s->radix; i++) { - if(s->queued_msgs[i] != NULL) { - printf("[%llu] leftover queued messages %d %d\n", LLU(lp->gid), i,s->vc_occupancy[i]); - } - if(s->pending_msgs[i] != NULL) { - printf("[%llu] lefover pending messages %d\n", LLU(lp->gid), i); + for (vc = 0; vc < s->params->num_vcs; vc++) { + if(s->queued_msgs[i][vc] != NULL) { + printf("[%llu] leftover queued messages %d %d %d\n", LLU(lp->gid), i,s->vc_occupancy[i][vc], vc); + } + if(s->pending_msgs[i][vc] != NULL) { + printf("[%llu] lefover pending messages %d %d\n", LLU(lp->gid), i, vc); + } } } diff --git a/src/networks/model-net/qos.c b/src/networks/model-net/qos.c new file mode 100644 index 00000000..a0af72e9 --- /dev/null +++ b/src/networks/model-net/qos.c @@ -0,0 +1,31 @@ +#include "qos.h" + +static void to_next_sl(int numSLs, size_t * qos_table_index, size_t * qos_table_counter) { + ++(*qos_table_index); + if (*qos_table_index >= numSLs) { + *qos_table_index = 0; + } + *qos_table_counter = 0; +} + +size_t get_next_sl(size_t numSLs, int * qos_table, size_t * qos_table_index, size_t * qos_table_counter, int (*has_packets)(void*,size_t), void * packet_data) { + for (size_t i = 0; i < numSLs; ++i) { + size_t sl = *qos_table_index; + if (has_packets(packet_data, sl)) { + // Advance the QoS table index and counter + ++(*qos_table_counter); + + if (*qos_table_counter >= qos_table[sl]) { + to_next_sl(numSLs, qos_table_index, qos_table_counter); + } + + // Perform the send + return sl; + } + else { + // The queue is empty, so advance to the next queue + to_next_sl(numSLs, qos_table_index, qos_table_counter); + } + } + return NO_PACKETS_TO_SEND; +} diff --git a/src/networks/model-net/qos.h b/src/networks/model-net/qos.h new file mode 100644 index 00000000..e865c84f --- /dev/null +++ b/src/networks/model-net/qos.h @@ -0,0 +1,10 @@ +#ifndef MODELNET_QOS_H +#define MODELNET_QOS_H + +#include + +#define NO_PACKETS_TO_SEND -1 + +size_t get_next_sl(size_t numSLs, int * qos_table, size_t * qos_table_index, size_t * qos_table_counter, int (*has_packets)(void*,size_t), void * packet_data); + +#endif