Skip to content

Commit

Permalink
Emulator actual embodiment: SAGDP process incoming packets in wait-lo…
Browse files Browse the repository at this point in the history
…cal state; fixes for testing environment
  • Loading branch information
divanchykhin committed Apr 7, 2015
1 parent 23ff152 commit c81b19d
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 36 deletions.
45 changes: 31 additions & 14 deletions tests/emulator/sa_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ int main_loop()
for (;;)
{
getmsg:
if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
printf( "Processing continued...\n" );
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}

// 1. Get message from comm peer
printf("Waiting for a packet from server...\n");
// ret_code = getMessage( sizeInOut, rwBuff, BUF_SIZE);
Expand Down Expand Up @@ -131,6 +146,20 @@ int main_loop()
while ( ret_code == COMMLAYER_RET_PENDING )
{
waitForTimeQuantum();
if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
printf( "Processing continued...\n" );
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}
if ( timer_val != 0 && getTime() >= wake_time )
{
printf( "no reply received; the last message (if any) will be resent by timer\n" );
Expand Down Expand Up @@ -177,19 +206,6 @@ int main_loop()
goto entry;
break;
}
else if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}
ret_code = tryGetMessage( MEMORY_HANDLE_MAIN_LOOP );
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
INCREMENT_COUNTER_IF( 97, "MAIN LOOP, packet received [2]", ret_code == COMMLAYER_RET_OK );
Expand Down Expand Up @@ -360,7 +376,7 @@ int main_loop()
entry:
wait_for_incoming_chain_with_timer = false;
if ( ret_code == YOCTOVM_WAIT_TO_CONTINUE )
printf( "YOCTO: ret: %d; rq_size: %d, rsp_size: %dwaiting to continue ...\n" );
printf( "YOCTO: ret: %d; waiting to continue ...\n", ret_code );
else
printf( "YOCTO: ret: %d; rq_size: %d, rsp_size: %d\n", ret_code, ugly_hook_get_request_size( MEMORY_HANDLE_MAIN_LOOP ), ugly_hook_get_response_size( MEMORY_HANDLE_MAIN_LOOP ) );

Expand Down Expand Up @@ -421,6 +437,7 @@ int main_loop()
{
if ( wait_to_continue_processing == 0 ) wait_to_continue_processing = 1;
wake_time_continue_processing = getTime() + wait_to_continue_processing;
printf( "Processing in progress... (period = %d, time = %d)\n", wait_to_continue_processing, wake_time_continue_processing );
goto getmsg;
break;
}
Expand Down
45 changes: 31 additions & 14 deletions tests/emulator/sa_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,42 @@ int main_loop()
for (;;)
{
getmsg:
if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
printf( "Processing continued...\n" );
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}

// 1. Get message from comm peer
ret_code = tryGetMessage( MEMORY_HANDLE_MAIN_LOOP );
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
INCREMENT_COUNTER_IF( 91, "MAIN LOOP, packet received [1]", ret_code == COMMLAYER_RET_OK );
while ( ret_code == COMMLAYER_RET_PENDING )
{
waitForTimeQuantum();
if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
printf( "Processing continued...\n" );
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}
if ( timer_val && getTime() >= wake_time )
{
printf( "no reply received; the last message (if any) will be resent by timer\n" );
Expand Down Expand Up @@ -123,19 +152,6 @@ int main_loop()
goto alt_entry;
break;
}
else if ( wait_to_continue_processing && getTime() >= wake_time_continue_processing )
{
INCREMENT_COUNTER( 98, "MAIN LOOP, continuing processing" );
wait_to_continue_processing = 0;
#ifdef USED_AS_MASTER
ret_code = master_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#else
ret_code = slave_process_continue( MEMORY_HANDLE_MAIN_LOOP );
#endif
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
goto entry;
break;
}
ret_code = tryGetMessage( MEMORY_HANDLE_MAIN_LOOP );
zepto_response_to_request( MEMORY_HANDLE_MAIN_LOOP );
INCREMENT_COUNTER_IF( 92, "MAIN LOOP, packet received [2]", ret_code == COMMLAYER_RET_OK );
Expand Down Expand Up @@ -290,7 +306,7 @@ int main_loop()
entry:
wait_for_incoming_chain_with_timer = false;
if ( ret_code == YOCTOVM_WAIT_TO_CONTINUE )
printf( "YOCTO: ret: %d; rq_size: %d, rsp_size: %dwaiting to continue ...\n" );
printf( "YOCTO: ret: %d; waiting to continue ...\n", ret_code );
else
printf( "YOCTO: ret: %d; rq_size: %d, rsp_size: %d\n", ret_code, ugly_hook_get_request_size( MEMORY_HANDLE_MAIN_LOOP ), ugly_hook_get_response_size( MEMORY_HANDLE_MAIN_LOOP ) );

Expand Down Expand Up @@ -353,6 +369,7 @@ int main_loop()
{
if ( wait_to_continue_processing == 0 ) wait_to_continue_processing = 1;
wake_time_continue_processing = getTime() + wait_to_continue_processing;
printf( "Processing in progress... (period = %d, time = %d)\n", wait_to_continue_processing, wake_time_continue_processing );
goto getmsg;
break;
}
Expand Down
202 changes: 202 additions & 0 deletions tests/emulator/sagdp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ uint8_t handlerSAGDP_receiveUP( uint8_t* timeout, uint8_t* nonce, uint8_t* pid,
*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_IDLE;
return SAGDP_RET_TO_HIGHER;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) != SAGDP_P_STATUS_FIRST )
{
// uint8_t* enc_reply_to = stack;
Expand Down Expand Up @@ -230,6 +234,10 @@ uint8_t handlerSAGDP_receiveUP( uint8_t* timeout, uint8_t* nonce, uint8_t* pid,
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_SYS_CORRUPTED;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
if ( ( packet_status & SAGDP_P_STATUS_MASK ) != SAGDP_P_STATUS_FIRST )
{
// *( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_NOT_INITIALIZED;
Expand Down Expand Up @@ -406,6 +414,10 @@ uint8_t handlerSAGDP_receiveUP( uint8_t* timeout, uint8_t* nonce, uint8_t* pid,
*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_IDLE;
return SAGDP_RET_TO_HIGHER;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_FIRST )
{
INCREMENT_COUNTER( 41, "handlerSAGDP_receiveUP(), wait-remote, first" );
Expand Down Expand Up @@ -553,6 +565,10 @@ uint8_t handlerSAGDP_receiveUP( uint8_t* timeout, uint8_t* nonce, uint8_t* pid,
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_SYS_CORRUPTED;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_FIRST )
{
INCREMENT_COUNTER( 41, "handlerSAGDP_receiveUP(), wait-remote, first" );
Expand Down Expand Up @@ -688,6 +704,192 @@ uint8_t handlerSAGDP_receiveUP( uint8_t* timeout, uint8_t* nonce, uint8_t* pid,
#endif
}

else if ( state == SAGDP_STATE_WAIT_LOCAL )
{
#ifdef USED_AS_MASTER
if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_ERROR_MSG )
{
INCREMENT_COUNTER( 40, "handlerSAGDP_receiveUP(), wait-remote, error" );
cancelLTO( data + DATA_SAGDP_LTO_OFFSET );
*timeout = *(data + DATA_SAGDP_LTO_OFFSET);
if ( zepto_parse_skip_block( &po, SAGDP_LRECEIVED_PID_SIZE ) )
{
parser_obj po1;
zepto_parser_init( &po1, &po );
uint16_t body_size = zepto_parsing_remaining_bytes( &po );
zepto_parse_skip_block( &po1, body_size );
zepto_convert_part_of_request_to_response( mem_h, &po, &po1 );
zepto_write_prepend_byte( mem_h, packet_status & SAGDP_P_STATUS_MASK );
}
else
{
zepto_write_uint8( mem_h, packet_status & SAGDP_P_STATUS_MASK );
}

*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_IDLE;
return SAGDP_RET_TO_HIGHER;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_FIRST ) // TODO: same plus request ACK
{
INCREMENT_COUNTER( 41, "handlerSAGDP_receiveUP(), wait-remote, first" );
// note: this "first" packet can be start of a new chain, or a re-sent of the beginning of the previous chain (if that previous chain had a length of 2)
// zepto_parse_read_block( &po, stack, SAGDP_LSENT_PID_SIZE );
uint8_t* this_chain_id = stack;
zepto_parser_decode_uint( &po, this_chain_id, SAGDP_LSENT_PID_SIZE );
const uint8_t* prev_chain_id = data + DATA_SAGDP_LRECEIVED_CHAIN_ID_OFFSET;
bool is_resent = pid_compare( this_chain_id, prev_chain_id ) == 0;
if ( is_resent )
{
bool ack_rq = false; // TODO: implement this branch
if ( ack_rq )
{
// if ACK is requested, send it
}
else
{
// already in work; ignore
return SAGDP_RET_OK;
}
}
else
{
INCREMENT_COUNTER( 43, "handlerSAGDP_receiveUP(), wait-remote, first, new (ignored)" );
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_OK; // just ignore
// TODO: form a packet
// return SAGDP_RET_TO_HIGHER_ERROR;
}
}
else // TODO: make sure no important option is left
{
assert( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_INTERMEDIATE || ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_TERMINATING );
uint8_t* pidlsent_first = data + DATA_SAGDP_FIRST_LSENT_PID_OFFSET;
uint8_t* pidlsent_last = data + DATA_SAGDP_NEXT_LSENT_PID_OFFSET;
// zepto_parse_read_block( &po, stack, SAGDP_LSENT_PID_SIZE );
// uint8_t* enc_reply_to = stack;
uint8_t* enc_reply_to = stack;
zepto_parser_decode_uint( &po, enc_reply_to, SAGDP_LSENT_PID_SIZE );
PRINTF( "handlerSAGDP_receiveNewUP(): PID last sent first : %x%x%x%x%x%x\n", pidlsent_first[0], pidlsent_first[1], pidlsent_first[2], pidlsent_first[3], pidlsent_first[4], pidlsent_first[5] );
PRINTF( "handlerSAGDP_receiveNewUP(): PID reply-to in packet: %x%x%x%x%x%x\n", enc_reply_to[0], enc_reply_to[1], enc_reply_to[2], enc_reply_to[3], enc_reply_to[4], enc_reply_to[5] );
PRINTF( "handlerSAGDP_receiveNewUP(): PID last sent last : %x%x%x%x%x%x\n", pidlsent_last[0], pidlsent_last[1], pidlsent_last[2], pidlsent_last[3], pidlsent_last[4], pidlsent_last[5] );
// bool isreply = memcmp( enc_reply_to + 1, data + DATA_SAGDP_LSENT_PID_OFFSET, SAGDP_LSENT_PID_SIZE ) == 0;
bool isrepeated = is_pid_in_range( enc_reply_to, data + DATA_SAGDP_FIRST_LSENT_PID_OFFSET, data + DATA_SAGDP_NEXT_LSENT_PID_OFFSET );
if ( isrepeated ) // so we've got what we are currently processing
{
bool ack_rq = false; // TODO: implement this branch
if ( ack_rq )
{
// if ACK is requested, send it
}
else
{
// already in work; ignore
return SAGDP_RET_OK;
}
}
else // ignore
{
INCREMENT_COUNTER( 46, "handlerSAGDP_receiveUP(), wait-remote, !is-reply, ignored" );
PRINTF( "SAGDP OK: CORRRUPTED: state = %d, packet_status = %d, !isreply\n", state, packet_status );
return SAGDP_RET_OK; // silently ignore
// TODO: form a packet
// return SAGDP_RET_TO_HIGHER_ERROR;
}
}
#else // USED_AS_MASTER not defined
if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_ERROR_MSG )
{
// send an error message to a communication partner and reinitialize
if ( nonce == NULL )
return SAGDP_RET_NEED_NONCE;
INCREMENT_COUNTER( 40, "handlerSAGDP_receiveUP(), wait-remote, error" );
zepto_write_uint8( mem_h, SAGDP_P_STATUS_ERROR_MSG );
// TODO: add other relevant data, if any, and update sizeInOut
*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_NOT_INITIALIZED;
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_SYS_CORRUPTED;
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_IS_ACK )
{
assert(0);
}
else if ( ( packet_status & SAGDP_P_STATUS_MASK ) == SAGDP_P_STATUS_FIRST ) // TODO: same plus request ACK
{
INCREMENT_COUNTER( 41, "handlerSAGDP_receiveUP(), wait-remote, first" );
// main question: is it a re-sent or a start of an actually new chain
uint8_t* enc_reply_to = stack;
zepto_parser_decode_uint( &po, enc_reply_to, SAGDP_LSENT_PID_SIZE );
bool current = pid_compare( enc_reply_to, data + DATA_SAGDP_LRECEIVED_CHAIN_ID_OFFSET ) == 0;
if ( current )
{
bool ack_rq = false; // TODO: implement this branch
if ( ack_rq )
{
// if ACK is requested, send it
}
else
{
// already in work; ignore
return SAGDP_RET_OK;
}
}
else
{
INCREMENT_COUNTER( 43, "handlerSAGDP_receiveUP(), wait-remote, first, new (applied)" );
*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_NOT_INITIALIZED;
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_START_OVER_FIRST_RECEIVED;
}
}
else // intermediate or terminating
{
assert( ( packet_status & SAGDP_P_STATUS_MASK ) != SAGDP_P_STATUS_FIRST );

// zepto_parse_read_block( &po, stack, SAGDP_LSENT_PID_SIZE );
// uint8_t* enc_reply_to = stack;
uint8_t* enc_reply_to = stack;
zepto_parser_decode_uint( &po, enc_reply_to, SAGDP_LSENT_PID_SIZE );
uint8_t* pidprevlsent_first = data + DATA_SAGDP_PREV_FIRST_LSENT_PID_OFFSET;
uint8_t* pidlsent_first = data + DATA_SAGDP_FIRST_LSENT_PID_OFFSET;
uint8_t* pidlsent_last = data + DATA_SAGDP_NEXT_LSENT_PID_OFFSET;
PRINTF( "handlerSAGDP_receiveNewUP(): PID last sent first : %x%x%x%x%x%x\n", pidprevlsent_first[0], pidprevlsent_first[1], pidprevlsent_first[2], pidprevlsent_first[3], pidprevlsent_first[4], pidprevlsent_first[5] );
PRINTF( "handlerSAGDP_receiveNewUP(): PID last sent first : %x%x%x%x%x%x\n", pidlsent_first[0], pidlsent_first[1], pidlsent_first[2], pidlsent_first[3], pidlsent_first[4], pidlsent_first[5] );
PRINTF( "handlerSAGDP_receiveNewUP(): PID reply-to in packet: %x%x%x%x%x%x\n", enc_reply_to[0], enc_reply_to[1], enc_reply_to[2], enc_reply_to[3], enc_reply_to[4], enc_reply_to[5] );
PRINTF( "handlerSAGDP_receiveNewUP(): PID last sent last : %x%x%x%x%x%x\n", pidlsent_last[0], pidlsent_last[1], pidlsent_last[2], pidlsent_last[3], pidlsent_last[4], pidlsent_last[5] );
bool isrepeated = is_pid_in_range( enc_reply_to, data + DATA_SAGDP_FIRST_LSENT_PID_OFFSET, data + DATA_SAGDP_NEXT_LSENT_PID_OFFSET );
if ( isrepeated ) // so we've got what we are currently processing
{
bool ack_rq = false; // TODO: implement this branch
if ( ack_rq )
{
// if ACK is requested, send it
}
else
{
// already in work; ignore
return SAGDP_RET_OK;
}
}
else // if not old, then unexpected
{
// send an error message to a communication partner and reinitialize
if ( nonce == NULL )
return SAGDP_RET_NEED_NONCE;
INCREMENT_COUNTER( 40, "handlerSAGDP_receiveUP(), wait-remote, error" );
zepto_write_uint8( mem_h, SAGDP_P_STATUS_ERROR_MSG );
// TODO: add other relevant data, if any, and update sizeInOut
*( data + DATA_SAGDP_STATE_OFFSET ) = SAGDP_STATE_NOT_INITIALIZED;
PRINTF( "SAGDP: CORRRUPTED: state = %d, packet_status = %d\n", state, packet_status );
return SAGDP_RET_SYS_CORRUPTED;
}
}
#endif
}

else // invalid states
{
INCREMENT_COUNTER( 50, "handlerSAGDP_receiveUP(), invalid state" );
Expand Down
Loading

0 comments on commit c81b19d

Please sign in to comment.