Skip to content

Commit

Permalink
Do not go through API if already on LwIP thread
Browse files Browse the repository at this point in the history
  • Loading branch information
me-no-dev committed Oct 13, 2017
1 parent 9d6150e commit a1a184a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
70 changes: 59 additions & 11 deletions src/AsyncTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ AsyncClient::AsyncClient(tcp_pcb* pcb)
, _connect_port(0)
, prev(NULL)
, next(NULL)
, _in_lwip_thread(false)
{
_pcb = pcb;
if(_pcb){
Expand Down Expand Up @@ -403,7 +404,11 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){

tcp_arg(pcb, this);
tcp_err(pcb, &_tcp_error);
_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
if(_in_lwip_thread){
tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
} else {
_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected);
}
return true;
}

Expand Down Expand Up @@ -432,8 +437,10 @@ int8_t AsyncClient::_connected(void* pcb, int8_t err){
tcp_sent(_pcb, &_tcp_sent);
tcp_poll(_pcb, &_tcp_poll, 1);
}
_in_lwip_thread = true;
if(_connect_cb)
_connect_cb(_connect_cb_arg, this);
_in_lwip_thread = false;
return ERR_OK;
}

Expand All @@ -446,7 +453,11 @@ int8_t AsyncClient::_close(){
tcp_recv(_pcb, NULL);
tcp_err(_pcb, NULL);
tcp_poll(_pcb, NULL, 0);
err = _tcp_close(_pcb);
if(_in_lwip_thread){
err = tcp_close(_pcb);
} else {
err = _tcp_close(_pcb);
}
if(err != ERR_OK) {
err = abort();
}
Expand Down Expand Up @@ -536,14 +547,17 @@ int8_t AsyncClient::_poll(tcp_pcb* pcb){
}

void AsyncClient::_dns_found(ip_addr_t *ipaddr){
_in_lwip_thread = true;
if(ipaddr){
connect(IPAddress(ipaddr->u_addr.ip4.addr), _connect_port);
} else {
log_e("dns fail");
if(_error_cb)
_error_cb(_error_cb_arg, this, -55);
if(_discard_cb)
_discard_cb(_discard_cb_arg, this);
}
_in_lwip_thread = false;
}

bool AsyncClient::operator==(const AsyncClient &other) {
Expand All @@ -566,14 +580,22 @@ bool AsyncClient::connect(const char* host, uint16_t port){
int8_t AsyncClient::abort(){
if(_pcb) {
log_w("state %d", _pcb->state);
_tcp_abort(_pcb);
if(_in_lwip_thread){
tcp_abort(_pcb);
} else {
_tcp_abort(_pcb);
}
_pcb = NULL;
}
return ERR_ABRT;
}

void AsyncClient::close(bool now){
_tcp_recved(_pcb, _rx_ack_len);
if(_in_lwip_thread){
tcp_recved(_pcb, _rx_ack_len);
} else {
_tcp_recved(_pcb, _rx_ack_len);
}
if(now)
_close();
else
Expand Down Expand Up @@ -620,14 +642,25 @@ size_t AsyncClient::add(const char* data, size_t size, uint8_t apiflags) {
if(!room)
return 0;
size_t will_send = (room < size) ? room : size;
int8_t err = _tcp_write(_pcb, data, will_send, apiflags);
int8_t err = ERR_OK;
if(_in_lwip_thread){
err = tcp_write(_pcb, data, will_send, apiflags);
} else {
err = _tcp_write(_pcb, data, will_send, apiflags);
}
if(err != ERR_OK)
return 0;
return will_send;
}

bool AsyncClient::send(){
if(_tcp_output(_pcb) == ERR_OK){
int8_t err = ERR_OK;
if(_in_lwip_thread){
err = tcp_output(_pcb);
} else {
err = _tcp_output(_pcb);
}
if(err == ERR_OK){
_pcb_busy = true;
_pcb_sent_at = millis();
return true;
Expand All @@ -638,8 +671,13 @@ bool AsyncClient::send(){
size_t AsyncClient::ack(size_t len){
if(len > _rx_ack_len)
len = _rx_ack_len;
if(len)
_tcp_recved(_pcb, len);
if(len){
if(_in_lwip_thread){
tcp_recved(_pcb, len);
} else {
_tcp_recved(_pcb, len);
}
}
_rx_ack_len -= len;
return len;
}
Expand Down Expand Up @@ -896,6 +934,7 @@ AsyncServer::AsyncServer(IPAddress addr, uint16_t port)
: _port(port)
, _addr(addr)
, _noDelay(false)
, _in_lwip_thread(false)
, _pcb(0)
, _connect_cb(0)
, _connect_cb_arg(0)
Expand All @@ -905,6 +944,7 @@ AsyncServer::AsyncServer(uint16_t port)
: _port(port)
, _addr((uint32_t) IPADDR_ANY)
, _noDelay(false)
, _in_lwip_thread(false)
, _pcb(0)
, _connect_cb(0)
, _connect_cb_arg(0)
Expand Down Expand Up @@ -935,12 +975,16 @@ int8_t AsyncServer::_accept(tcp_pcb* pcb, int8_t err){

AsyncClient *c = new AsyncClient(pcb);
if(c){
_in_lwip_thread = true;
c->_in_lwip_thread = true;
_connect_cb(_connect_cb_arg, c);
c->_in_lwip_thread = false;
_in_lwip_thread = false;
return ERR_OK;
}
}
if(_tcp_close(pcb) != ERR_OK){
_tcp_abort(pcb);
if(tcp_close(pcb) != ERR_OK){
tcp_abort(pcb);
}
log_e("FAIL");
return ERR_OK;
Expand Down Expand Up @@ -985,9 +1029,13 @@ void AsyncServer::begin(){

void AsyncServer::end(){
if(_pcb){
_tcp_abort(_pcb);
tcp_arg(_pcb, NULL);
tcp_accept(_pcb, NULL);
if(_in_lwip_thread){
tcp_abort(_pcb);
} else {
_tcp_abort(_pcb);
}
_pcb = NULL;
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/AsyncTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ class AsyncClient {
static int8_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len);
static int8_t _s_connected(void* arg, void* tpcb, int8_t err);
static void _s_dns_found(const char *name, struct _ip_addr *ipaddr, void *arg);

bool _in_lwip_thread;
};

class AsyncServer {
protected:
uint16_t _port;
IPAddress _addr;
bool _noDelay;
bool _in_lwip_thread;
tcp_pcb* _pcb;
AcConnectHandler _connect_cb;
void* _connect_cb_arg;
Expand Down

0 comments on commit a1a184a

Please sign in to comment.