Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Update network-mpi.c #133

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 164 additions & 27 deletions core/network-mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,68 @@ struct act_q
MPI_Request *req_list;
int *idx_list;
MPI_Status *status_list;
int *free_idx_list;//add, que of free indices


#if ROSS_MEMORY
char **buffers;
#endif

unsigned int cur;
unsigned int cur;
int front;//add, front of queue
int coda;//add, back of queue but back is already a variable somewhere
int sizeOfQ;//add, size of queue array
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, we use snake_case, not camelCase.

int numInQ;//add, number of elements in queue
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change this to something like num_in_free_q or something? I originally thought it was supposed to be the number of active operations you have in the queue based on the name, but eventually I realized it's supposed to be the number of free elements in the queue. Being a little clearer in the name will probably help in the future if someone else needs to make changes here.


// Deal with filling queue, then plateauing

};

int deal_with_cur(struct act_q *q)// try this
{
if(q->cur < (q->sizeOfQ-1))
{
q->cur++;
return 1;
}
else
{
return 1;
}
}


int fr_q_chq(struct act_q *q, int *frontOrCoda) //free index queue; check for modulating the front or back index of que
{
if(*frontOrCoda != q->sizeOfQ)//don't mess with queue
{
return 0;// return probably not necessary
}
else//mess with queue
{
*frontOrCoda = 0;
return 0;
}
}

void fr_q_aq(struct act_q *q, int ele) // free index queue; add to queue
{
q->free_idx_list[q->coda] = ele;
q->coda++;
q->numInQ++;
fr_q_chq(q,&q->coda);//wraps the queue array around

}

int fr_q_dq(struct act_q *q) // free index queue; dequeue
{
int rv =q->free_idx_list[q->front];
q->front++;
q->numInQ--;
fr_q_chq(q,&q->front);// wraps the queue array around

return rv;
}
#define EVENT_TAG 1

#if ROSS_MEMORY
Expand Down Expand Up @@ -101,7 +156,19 @@ init_q(struct act_q *q, const char *name)
q->event_list = (tw_event **) tw_calloc(TW_LOC, name, sizeof(*q->event_list), n);
q->req_list = (MPI_Request *) tw_calloc(TW_LOC, name, sizeof(*q->req_list), n);
q->idx_list = (int *) tw_calloc(TW_LOC, name, sizeof(*q->idx_list), n);
q->status_list = (MPI_Status *) tw_calloc(TW_LOC, name, sizeof(*q->status_list), n);
q->free_idx_list = (int *) tw_calloc(TW_LOC, name, sizeof(*q->idx_list), n);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you already caught that you have an issue with reading/writing out of bounds of this list.

q->status_list = (MPI_Status *) tw_calloc(TW_LOC, name, sizeof(*q->status_list), n+1);// queue, n+1 is meant to prevent a full queue
q->front = 0;// front of queue
q->coda = 0;// end of queue
q->sizeOfQ=n+1;// for wraparound
q->numInQ= 0;// number of elements in queue

int i = 0;
while(i<n) // initializes the queue
{
fr_q_aq( q , i) ;
i++;
}

#if ROSS_MEMORY
q->buffers = tw_calloc(TW_LOC, name, sizeof(*q->buffers), n);
Expand Down Expand Up @@ -207,7 +274,7 @@ tw_net_minimum(tw_pe *me)
e = e->next;
}

for (i = 0; i < posted_sends.cur; i++) {
for (i = 0; i < posted_sends.cur; i++) { //fix this line (?)
e = posted_sends.event_list[i];
if (m > e->recv_ts)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're also going to need to add a check to see if e is NULL.

m = e->recv_ts;
Expand All @@ -228,7 +295,10 @@ test_q(
char *tmp;
#endif

if (!q->cur)
// if ( !q->cur || q->numInQ == ((q->sizeOfQ)-1) ) //fixed this line (?) if queue is full, no elements are being processed
// return 0;

if( q->numInQ == ((q->sizeOfQ)-1) )
return 0;

if (MPI_Testsome(
Expand All @@ -254,6 +324,7 @@ test_q(
n = q->idx_list[i];
e = q->event_list[n];
q->event_list[n] = NULL;
fr_q_aq(q,n);//add n onto queue

#if ROSS_MEMORY
finish(me, e, q->buffers[n]);
Expand All @@ -263,7 +334,8 @@ test_q(
}

/* Collapse the lists to remove any holes we left. */
for (i = 0, n = 0; i < q->cur; i++)
/*
for (i = 0, n = 0; i < q->cur; i++)//fix these lines
{
if (q->event_list[i])
{
Expand All @@ -288,8 +360,8 @@ test_q(
n++;
} // endif (q->event_list[i])
}
q->cur -= ready;

q->cur -= ready;//fix this line
*/
return 1;
}

Expand All @@ -303,14 +375,15 @@ recv_begin(tw_pe *me)
int flag = 0;
int changed = 0;

while (posted_recvs.cur < read_buffer)
while (0 < posted_recvs.numInQ)//fix these lines
{
unsigned id = posted_recvs.cur;

int id = fr_q_dq(&posted_recvs);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be moved until after trying to pull the event using tw_event_grab. As it is currently, if there is no event to grab, it will mess up your buffer's free list.


if(!(e = tw_event_grab(me)))
{
if(tw_gvt_inprogress(me))
tw_error(TW_LOC, "Out of events in GVT! Consider increasing --extramem");
tw_error(TW_LOC, "out of events in GVT!");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as with the comment on tw_net_statistics below. This was a recent change, so you'll have to update your branch so that way we don't eventually undo recent changes to ROSS master.

return changed;
}

Expand All @@ -337,7 +410,8 @@ recv_begin(tw_pe *me)
}

posted_recvs.event_list[id] = e;
posted_recvs.cur++;
deal_with_cur(&posted_recvs);
// fixed in fr_q_dq //posted_recvs.cur++; //fix this line
changed = 1;
}

Expand All @@ -348,7 +422,6 @@ static void
recv_finish(tw_pe *me, tw_event *e, char * buffer)
{
tw_pe *dest_pe;
tw_clock start;

#if ROSS_MEMORY
tw_memory *memory;
Expand Down Expand Up @@ -460,9 +533,7 @@ recv_finish(tw_pe *me, tw_event *e, char * buffer)
/* Fast case, we are sending to our own PE and
* there is no rollback caused by this send.
*/
start = tw_clock_read();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and lines 351 and 465 needs to be added back in as well (same reason as tw_net_statistics).

tw_pq_enqueue(dest_pe->pq, e);
dest_pe->stats.s_pq += tw_clock_read() - start;
return;
}

Expand Down Expand Up @@ -493,12 +564,13 @@ send_begin(tw_pe *me)
{
int changed = 0;

while (posted_sends.cur < send_buffer)
while (0 < posted_sends.numInQ)//fixed these line (hopefully)
{
tw_event *e = tw_eventq_peek(&outq);
tw_event *e = tw_eventq_peek(&outq);//next event?
tw_node *dest_node = NULL;

unsigned id = posted_sends.cur;
int id = fr_q_dq(&posted_sends);// fixed, grabs from front of queue, moves front up one element
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will need to be moved to just after if (!e) break; because if the event is null, it's going to mess up your free element list for posted_sends.

// posted_sends.cur; //fix this line

#if ROSS_MEMORY
tw_event *tmp_prev = NULL;
Expand Down Expand Up @@ -609,7 +681,9 @@ send_begin(tw_pe *me)
: TW_net_asend;

posted_sends.event_list[id] = e;
posted_sends.cur++;
deal_with_cur(&posted_sends);

// fixed in fr_q_dq //posted_sends.cur++;//fix this line
me->s_nwhite_sent++;

changed = 1;
Expand Down Expand Up @@ -786,13 +860,31 @@ tw_net_statistics(tw_pe * me, tw_statistics * s)

if(MPI_Reduce(&(s->s_net_events),
&me->stats.s_net_events,
17,
16,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your current changes, but at some point, you'll need to update your branch to pull back in the original code. I think you were working from an old version of this file, and tw_net_statistics has recently changed, so you'll need to make sure that it's up to date with ROSS master. I don't think you'll need to change this at all for what you're working on, so it should just look exactly like what ROSS master currently looks like.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, this comment means the full tw_net_statistics function needs to be reverted back, not just this one line.

MPI_UNSIGNED_LONG_LONG,
MPI_SUM,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if(MPI_Reduce(&s->s_total,
&me->stats.s_total,
8,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if(MPI_Reduce(&s->s_pe_event_ties,
&me->stats.s_pe_event_ties,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_SUM,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if(MPI_Reduce(&s->s_min_detected_offset,
&me->stats.s_min_detected_offset,
1,
Expand All @@ -802,24 +894,69 @@ tw_net_statistics(tw_pe * me, tw_statistics * s)
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if(MPI_Reduce(&(s->s_total),
&me->stats.s_total,
16,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
if(MPI_Reduce(&s->s_avl,
&me->stats.s_avl,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if (MPI_Reduce(&s->s_buddy,
&me->stats.s_buddy,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if (MPI_Reduce(&s->s_lz4,
&me->stats.s_lz4,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if (MPI_Reduce(&s->s_events_past_end,
&me->stats.s_events_past_end,
3,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_SUM,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if (MPI_Reduce(&g_st_stat_comp_ctr,
&me->stats.s_stat_comp,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if (MPI_Reduce(&g_st_stat_write_ctr,
&me->stats.s_stat_write,
1,
MPI_UNSIGNED_LONG_LONG,
MPI_MAX,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

if(MPI_Reduce(&(s->s_alp_nevent_processed),
&me->stats.s_alp_nevent_processed,
2,
MPI_UNSIGNED_LONG_LONG,
MPI_SUM,
(int)g_tw_masternode,
MPI_COMM_ROSS) != MPI_SUCCESS)
tw_error(TW_LOC, "Unable to reduce statistics!");

#ifdef USE_RIO
if (MPI_Reduce(&s->s_rio_load,
&me->stats.s_rio_load,
Expand Down