diff --git a/libraries/ESP8266WebServer/src/ESP8266WebServer.cpp b/libraries/ESP8266WebServer/src/ESP8266WebServer.cpp index 353381bef2..ed06b73476 100644 --- a/libraries/ESP8266WebServer/src/ESP8266WebServer.cpp +++ b/libraries/ESP8266WebServer/src/ESP8266WebServer.cpp @@ -193,7 +193,7 @@ void ESP8266WebServer::handleClient() { _currentStatus = HC_NONE; return; } - + _currentClient.setTimeout(HTTP_MAX_SEND_WAIT); _contentLength = CONTENT_LENGTH_NOT_SET; _handleRequest(); @@ -241,6 +241,9 @@ void ESP8266WebServer::sendHeader(const String& name, const String& value, bool } } +void ESP8266WebServer::setContentLength(size_t contentLength) { + _contentLength = contentLength; +} void ESP8266WebServer::_prepareHeader(String& response, int code, const char* content_type, size_t contentLength) { response = "HTTP/1.1 "; @@ -270,7 +273,6 @@ void ESP8266WebServer::send(int code, const char* content_type, const String& co String header; _prepareHeader(header, code, content_type, content.length()); sendContent(header); - sendContent(content); } @@ -307,67 +309,15 @@ void ESP8266WebServer::send(int code, const String& content_type, const String& } void ESP8266WebServer::sendContent(const String& content) { - const size_t unit_size = HTTP_DOWNLOAD_UNIT_SIZE; - size_t size_to_send = content.length(); - const char* send_start = content.c_str(); - - while (size_to_send) { - size_t will_send = (size_to_send < unit_size) ? size_to_send : unit_size; - size_t sent = _currentClient.write(send_start, will_send); - if (sent == 0) { - break; - } - size_to_send -= sent; - send_start += sent; - } + _currentClient.write(content.c_str(), content.length()); } void ESP8266WebServer::sendContent_P(PGM_P content) { - char contentUnit[HTTP_DOWNLOAD_UNIT_SIZE + 1]; - - contentUnit[HTTP_DOWNLOAD_UNIT_SIZE] = '\0'; - - while (content != NULL) { - size_t contentUnitLen; - PGM_P contentNext; - - // due to the memccpy signature, lots of casts are needed - contentNext = (PGM_P)memccpy_P((void*)contentUnit, (PGM_VOID_P)content, 0, HTTP_DOWNLOAD_UNIT_SIZE); - - if (contentNext == NULL) { - // no terminator, more data available - content += HTTP_DOWNLOAD_UNIT_SIZE; - contentUnitLen = HTTP_DOWNLOAD_UNIT_SIZE; - } - else { - // reached terminator. Do not send the terminator - contentUnitLen = contentNext - contentUnit - 1; - content = NULL; - } - - // write is so overloaded, had to use the cast to get it pick the right one - _currentClient.write((const char*)contentUnit, contentUnitLen); - } + _currentClient.write_P(content, strlen_P(content)); } void ESP8266WebServer::sendContent_P(PGM_P content, size_t size) { - char contentUnit[HTTP_DOWNLOAD_UNIT_SIZE + 1]; - contentUnit[HTTP_DOWNLOAD_UNIT_SIZE] = '\0'; - size_t remaining_size = size; - - while (content != NULL && remaining_size > 0) { - size_t contentUnitLen = HTTP_DOWNLOAD_UNIT_SIZE; - - if (remaining_size < HTTP_DOWNLOAD_UNIT_SIZE) contentUnitLen = remaining_size; - // due to the memcpy signature, lots of casts are needed - memcpy_P((void*)contentUnit, (PGM_VOID_P)content, contentUnitLen); - - content += contentUnitLen; - remaining_size -= contentUnitLen; - - // write is so overloaded, had to use the cast to get it pick the right one - _currentClient.write((const char*)contentUnit, contentUnitLen); - } + _currentClient.write_P(content, size); } diff --git a/libraries/ESP8266WebServer/src/ESP8266WebServer.h b/libraries/ESP8266WebServer/src/ESP8266WebServer.h index 6fc8a6dbb7..e1db841cd2 100644 --- a/libraries/ESP8266WebServer/src/ESP8266WebServer.h +++ b/libraries/ESP8266WebServer/src/ESP8266WebServer.h @@ -36,6 +36,7 @@ enum HTTPClientStatus { HC_NONE, HC_WAIT_READ, HC_WAIT_CLOSE }; #define HTTP_UPLOAD_BUFLEN 2048 #define HTTP_MAX_DATA_WAIT 1000 //ms to wait for the client to send the request #define HTTP_MAX_POST_WAIT 1000 //ms to wait for POST data to arrive +#define HTTP_MAX_SEND_WAIT 5000 //ms to wait for data chunk to be ACKed #define HTTP_MAX_CLOSE_WAIT 2000 //ms to wait for the client to close the connection #define CONTENT_LENGTH_UNKNOWN ((size_t) -1) @@ -113,7 +114,7 @@ class ESP8266WebServer void send_P(int code, PGM_P content_type, PGM_P content); void send_P(int code, PGM_P content_type, PGM_P content, size_t contentLength); - void setContentLength(size_t contentLength) { _contentLength = contentLength; } + void setContentLength(size_t contentLength); void sendHeader(const String& name, const String& value, bool first = false); void sendContent(const String& content); void sendContent_P(PGM_P content); @@ -129,7 +130,7 @@ template<typename T> size_t streamFile(T &file, const String& contentType){ sendHeader("Content-Encoding", "gzip"); } send(200, contentType, ""); - return _currentClient.write(file, HTTP_DOWNLOAD_UNIT_SIZE); + return _currentClient.write(file); } protected: diff --git a/libraries/ESP8266WiFi/src/WiFiClient.cpp b/libraries/ESP8266WiFi/src/WiFiClient.cpp index 87e803a218..75a550b6ed 100644 --- a/libraries/ESP8266WiFi/src/WiFiClient.cpp +++ b/libraries/ESP8266WiFi/src/WiFiClient.cpp @@ -172,35 +172,30 @@ size_t WiFiClient::write(const uint8_t *buf, size_t size) { return 0; } + return _client->write(buf, size); +} - return _client->write(reinterpret_cast<const char*>(buf), size); +size_t WiFiClient::write(Stream& stream, size_t unused) +{ + return WiFiClient::write(stream); } -size_t WiFiClient::write_P(PGM_P buf, size_t size) +size_t WiFiClient::write(Stream& stream) { - if (!_client || !size) + if (!_client || !stream.available()) { return 0; } + return _client->write(stream); +} - char chunkUnit[WIFICLIENT_MAX_PACKET_SIZE + 1]; - chunkUnit[WIFICLIENT_MAX_PACKET_SIZE] = '\0'; - size_t remaining_size = size; - - while (buf != NULL && remaining_size > 0) { - size_t chunkUnitLen = WIFICLIENT_MAX_PACKET_SIZE; - - if (remaining_size < WIFICLIENT_MAX_PACKET_SIZE) chunkUnitLen = remaining_size; - // due to the memcpy signature, lots of casts are needed - memcpy_P((void*)chunkUnit, (PGM_VOID_P)buf, chunkUnitLen); - - buf += chunkUnitLen; - remaining_size -= chunkUnitLen; - - // write is so overloaded, had to use the cast to get it pick the right one - _client->write((const char*)chunkUnit, chunkUnitLen); +size_t WiFiClient::write_P(PGM_P buf, size_t size) +{ + if (!_client || !size) + { + return 0; } - return size; + return _client->write_P(buf, size); } int WiFiClient::available() diff --git a/libraries/ESP8266WiFi/src/WiFiClient.h b/libraries/ESP8266WiFi/src/WiFiClient.h index 340cd62b9f..dabd65f26e 100644 --- a/libraries/ESP8266WiFi/src/WiFiClient.h +++ b/libraries/ESP8266WiFi/src/WiFiClient.h @@ -49,8 +49,10 @@ class WiFiClient : public Client, public SList<WiFiClient> { virtual size_t write(uint8_t); virtual size_t write(const uint8_t *buf, size_t size); size_t write_P(PGM_P buf, size_t size); - template <typename T> - size_t write(T& source, size_t unitSize); + size_t write(Stream& stream); + + // This one is deprecated, use write(Stream& instead) + size_t write(Stream& stream, size_t unitSize) __attribute__ ((deprecated)); virtual int available(); virtual int read(); @@ -73,28 +75,6 @@ class WiFiClient : public Client, public SList<WiFiClient> { void setNoDelay(bool nodelay); static void setLocalPortStart(uint16_t port) { _localPort = port; } - template<typename T> size_t write(T &src){ - uint8_t obuf[WIFICLIENT_MAX_PACKET_SIZE]; - size_t doneLen = 0; - size_t sentLen; - int i; - - while (src.available() > WIFICLIENT_MAX_PACKET_SIZE){ - src.read(obuf, WIFICLIENT_MAX_PACKET_SIZE); - sentLen = write(obuf, WIFICLIENT_MAX_PACKET_SIZE); - doneLen = doneLen + sentLen; - if(sentLen != WIFICLIENT_MAX_PACKET_SIZE){ - return doneLen; - } - } - - uint16_t leftLen = src.available(); - src.read(obuf, leftLen); - sentLen = write(obuf, leftLen); - doneLen = doneLen + sentLen; - return doneLen; - } - friend class WiFiServer; using Print::write; @@ -114,24 +94,4 @@ class WiFiClient : public Client, public SList<WiFiClient> { static uint16_t _localPort; }; - -template <typename T> -inline size_t WiFiClient::write(T& source, size_t unitSize) { - std::unique_ptr<uint8_t[]> buffer(new uint8_t[unitSize]); - size_t size_sent = 0; - while(true) { - size_t left = source.available(); - if (!left) - break; - size_t will_send = (left < unitSize) ? left : unitSize; - source.read(buffer.get(), will_send); - size_t cb = write(buffer.get(), will_send); - size_sent += cb; - if (cb != will_send) { - break; - } - } - return size_sent; -} - #endif diff --git a/libraries/ESP8266WiFi/src/WiFiClientSecure.cpp b/libraries/ESP8266WiFi/src/WiFiClientSecure.cpp index 30eab400a3..555ee18c37 100644 --- a/libraries/ESP8266WiFi/src/WiFiClientSecure.cpp +++ b/libraries/ESP8266WiFi/src/WiFiClientSecure.cpp @@ -557,7 +557,8 @@ extern "C" int ax_port_write(int fd, uint8_t* buffer, size_t count) { errno = EIO; return -1; } - size_t cb = _client->write((const char*) buffer, count); + + size_t cb = _client->write(buffer, count); if (cb != count) { errno = EAGAIN; } diff --git a/libraries/ESP8266WiFi/src/include/ClientContext.h b/libraries/ESP8266WiFi/src/include/ClientContext.h index 7a46b58be0..d2ee0903b3 100644 --- a/libraries/ESP8266WiFi/src/include/ClientContext.h +++ b/libraries/ESP8266WiFi/src/include/ClientContext.h @@ -35,317 +35,447 @@ typedef err_t recv_ret_t; typedef int32_t recv_ret_t; #endif -class ClientContext { - public: - ClientContext(tcp_pcb* pcb, discard_cb_t discard_cb, void* discard_cb_arg) : - _pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0), _send_waiting(false) { - tcp_setprio(pcb, TCP_PRIO_MIN); - tcp_arg(pcb, this); - tcp_recv(pcb, (tcp_recv_fn) &_s_recv); - tcp_sent(pcb, &_s_sent); - tcp_err(pcb, &_s_error); - } - - err_t abort(){ - if(_pcb) { - DEBUGV(":abort\r\n"); - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_abort(_pcb); - _pcb = 0; - } - return ERR_ABRT; - } - - err_t close(){ - err_t err = ERR_OK; - if(_pcb) { - DEBUGV(":close\r\n"); - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - err = tcp_close(_pcb); - if(err != ERR_OK) { - DEBUGV(":tc err %d\r\n", err); - tcp_abort(_pcb); - err = ERR_ABRT; - } - _pcb = 0; - } - return err; - } - - ~ClientContext() { - } - - ClientContext* next() const { - return _next; - } - - ClientContext* next(ClientContext* new_next) { - _next = new_next; - return _next; - } - - void ref() { - ++_refcnt; - DEBUGV(":ref %d\r\n", _refcnt); - } - - void unref() { - if(this != 0) { - DEBUGV(":ur %d\r\n", _refcnt); - if(--_refcnt == 0) { - flush(); - close(); - if(_discard_cb) - _discard_cb(_discard_cb_arg, this); - DEBUGV(":del\r\n"); - delete this; +#include "DataSource.h" + +class ClientContext +{ +public: + ClientContext(tcp_pcb* pcb, discard_cb_t discard_cb, void* discard_cb_arg) : + _pcb(pcb), _rx_buf(0), _rx_buf_offset(0), _discard_cb(discard_cb), _discard_cb_arg(discard_cb_arg), _refcnt(0), _next(0) + { + tcp_setprio(pcb, TCP_PRIO_MIN); + tcp_arg(pcb, this); + tcp_recv(pcb, (tcp_recv_fn) &_s_recv); + tcp_sent(pcb, &_s_sent); + tcp_err(pcb, &_s_error); + tcp_poll(pcb, &_s_poll, 1); + } + + err_t abort() + { + if(_pcb) { + DEBUGV(":abort\r\n"); + tcp_arg(_pcb, NULL); + tcp_sent(_pcb, NULL); + tcp_recv(_pcb, NULL); + tcp_err(_pcb, NULL); + tcp_poll(_pcb, NULL, 0); + tcp_abort(_pcb); + _pcb = 0; + } + return ERR_ABRT; + } + + err_t close() + { + err_t err = ERR_OK; + if(_pcb) { + DEBUGV(":close\r\n"); + tcp_arg(_pcb, NULL); + tcp_sent(_pcb, NULL); + tcp_recv(_pcb, NULL); + tcp_err(_pcb, NULL); + tcp_poll(_pcb, NULL, 0); + err = tcp_close(_pcb); + if(err != ERR_OK) { + DEBUGV(":tc err %d\r\n", err); + tcp_abort(_pcb); + err = ERR_ABRT; + } + _pcb = 0; + } + return err; + } + + ~ClientContext() + { + } + + ClientContext* next() const + { + return _next; + } + + ClientContext* next(ClientContext* new_next) + { + _next = new_next; + return _next; + } + + void ref() + { + ++_refcnt; + DEBUGV(":ref %d\r\n", _refcnt); + } + + void unref() + { + if(this != 0) { + DEBUGV(":ur %d\r\n", _refcnt); + if(--_refcnt == 0) { + flush(); + close(); + if(_discard_cb) { + _discard_cb(_discard_cb_arg, this); } + DEBUGV(":del\r\n"); + delete this; } } + } - void setNoDelay(bool nodelay){ - if(!_pcb) return; - if(nodelay) tcp_nagle_disable(_pcb); - else tcp_nagle_enable(_pcb); + void setNoDelay(bool nodelay) + { + if(!_pcb) { + return; } - - bool getNoDelay(){ - if(!_pcb) return false; - return tcp_nagle_disabled(_pcb); + if(nodelay) { + tcp_nagle_disable(_pcb); + } else { + tcp_nagle_enable(_pcb); } + } - uint32_t getRemoteAddress() { - if(!_pcb) return 0; - - return _pcb->remote_ip.addr; + bool getNoDelay() + { + if(!_pcb) { + return false; } - - uint16_t getRemotePort() { - if(!_pcb) return 0; - - return _pcb->remote_port; + return tcp_nagle_disabled(_pcb); + } + + void setNonBlocking(bool nonblocking) + { + _noblock = nonblocking; + } + + bool getNonBlocking() + { + return _noblock; + } + + uint32_t getRemoteAddress() + { + if(!_pcb) { + return 0; } - uint32_t getLocalAddress() { - if(!_pcb) return 0; + return _pcb->remote_ip.addr; + } - return _pcb->local_ip.addr; + uint16_t getRemotePort() + { + if(!_pcb) { + return 0; } - uint16_t getLocalPort() { - if(!_pcb) return 0; + return _pcb->remote_port; + } - return _pcb->local_port; + uint32_t getLocalAddress() + { + if(!_pcb) { + return 0; } - size_t getSize() const { - if(!_rx_buf) return 0; + return _pcb->local_ip.addr; + } - return _rx_buf->tot_len - _rx_buf_offset; + uint16_t getLocalPort() + { + if(!_pcb) { + return 0; } - char read() { - if(!_rx_buf) return 0; + return _pcb->local_port; + } - char c = reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset]; - _consume(1); - return c; + size_t getSize() const + { + if(!_rx_buf) { + return 0; } - size_t read(char* dst, size_t size) { - if(!_rx_buf) return 0; + return _rx_buf->tot_len - _rx_buf_offset; + } - size_t max_size = _rx_buf->tot_len - _rx_buf_offset; - size = (size < max_size) ? size : max_size; - - DEBUGV(":rd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset); - size_t size_read = 0; - while(size) { - size_t buf_size = _rx_buf->len - _rx_buf_offset; - size_t copy_size = (size < buf_size) ? size : buf_size; - DEBUGV(":rdi %d, %d\r\n", buf_size, copy_size); - os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size); - dst += copy_size; - _consume(copy_size); - size -= copy_size; - size_read += copy_size; - } - return size_read; + char read() + { + if(!_rx_buf) { + return 0; } - char peek() { - if(!_rx_buf) return 0; + char c = reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset]; + _consume(1); + return c; + } - return reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset]; + size_t read(char* dst, size_t size) + { + if(!_rx_buf) { + return 0; } - size_t peekBytes(char *dst, size_t size) { - if(!_rx_buf) return 0; + size_t max_size = _rx_buf->tot_len - _rx_buf_offset; + size = (size < max_size) ? size : max_size; - size_t max_size = _rx_buf->tot_len - _rx_buf_offset; - size = (size < max_size) ? size : max_size; - - DEBUGV(":pd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset); + DEBUGV(":rd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset); + size_t size_read = 0; + while(size) { size_t buf_size = _rx_buf->len - _rx_buf_offset; size_t copy_size = (size < buf_size) ? size : buf_size; - DEBUGV(":rpi %d, %d\r\n", buf_size, copy_size); + DEBUGV(":rdi %d, %d\r\n", buf_size, copy_size); os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size); - return copy_size; + dst += copy_size; + _consume(copy_size); + size -= copy_size; + size_read += copy_size; } + return size_read; + } - void flush() { - if(!_rx_buf) { - return; - } - if(_pcb) { - tcp_recved(_pcb, (size_t) _rx_buf->tot_len); - } - pbuf_free(_rx_buf); - _rx_buf = 0; - _rx_buf_offset = 0; + char peek() + { + if(!_rx_buf) { + return 0; } - uint8_t state() const { - if(!_pcb) return CLOSED; + return reinterpret_cast<char*>(_rx_buf->payload)[_rx_buf_offset]; + } - return _pcb->state; + size_t peekBytes(char *dst, size_t size) + { + if(!_rx_buf) { + return 0; } - size_t write(const char* data, size_t size) { - if(!_pcb) { - DEBUGV(":wr !_pcb\r\n"); - return 0; - } + size_t max_size = _rx_buf->tot_len - _rx_buf_offset; + size = (size < max_size) ? size : max_size; + + DEBUGV(":pd %d, %d, %d\r\n", size, _rx_buf->tot_len, _rx_buf_offset); + size_t buf_size = _rx_buf->len - _rx_buf_offset; + size_t copy_size = (size < buf_size) ? size : buf_size; + DEBUGV(":rpi %d, %d\r\n", buf_size, copy_size); + os_memcpy(dst, reinterpret_cast<char*>(_rx_buf->payload) + _rx_buf_offset, copy_size); + return copy_size; + } + + void flush() + { + if(!_rx_buf) { + return; + } + if(_pcb) { + tcp_recved(_pcb, (size_t) _rx_buf->tot_len); + } + pbuf_free(_rx_buf); + _rx_buf = 0; + _rx_buf_offset = 0; + } + + uint8_t state() const + { + if(!_pcb) { + return CLOSED; + } - if(size == 0) { - return 0; - } + return _pcb->state; + } - size_t room = tcp_sndbuf(_pcb); - size_t will_send = (room < size) ? room : size; - err_t err = tcp_write(_pcb, data, will_send, 0); - if(err != ERR_OK) { - DEBUGV(":wr !ERR_OK\r\n"); - return 0; - } - _size_sent = will_send; - DEBUGV(":wr\r\n"); - tcp_output( _pcb ); - _send_waiting = true; - delay(5000); // max send timeout - _send_waiting = false; - DEBUGV(":ww\r\n"); - return will_send - _size_sent; - } - - private: - - err_t _sent(tcp_pcb* pcb, uint16_t len) { - DEBUGV(":sent %d\r\n", len); - _size_sent -= len; - if(_size_sent == 0 && _send_waiting) esp_schedule(); - return ERR_OK; - } - - void _consume(size_t size) { - ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size; - if(left > 0) { - _rx_buf_offset += size; - } else if(!_rx_buf->next) { - DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len); - if(_pcb) tcp_recved(_pcb, _rx_buf->len); - pbuf_free(_rx_buf); - _rx_buf = 0; - _rx_buf_offset = 0; - } else { - DEBUGV(":c %d, %d, %d\r\n", size, _rx_buf->len, _rx_buf->tot_len); - auto head = _rx_buf; - _rx_buf = _rx_buf->next; - _rx_buf_offset = 0; - pbuf_ref(_rx_buf); - if(_pcb) tcp_recved(_pcb, head->len); - pbuf_free(head); - } + size_t write(const uint8_t* data, size_t size) + { + if (!_pcb) { + return 0; } + return _write_from_source(new BufferDataSource(data, size)); + } - recv_ret_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err) { + size_t write(Stream& stream) + { + if (!_pcb) { + return 0; + } + return _write_from_source(new BufferedStreamDataSource<Stream>(stream, stream.available())); + } - if(pb == 0) // connection closed - { - DEBUGV(":rcl\r\n"); - if (_send_waiting) { - esp_schedule(); - } - abort(); - return ERR_ABRT; - } + size_t write_P(PGM_P buf, size_t size) + { + if (!_pcb) { + return 0; + } + ProgmemStream stream(buf, size); + return _write_from_source(new BufferedStreamDataSource<ProgmemStream>(stream, size)); + } + +protected: + + void _cancel_write() + { + if (_datasource) { + delete _datasource; + _datasource = nullptr; + esp_schedule(); + } + } + + size_t _write_from_source(DataSource* ds) + { + assert(_datasource == nullptr); + _datasource = ds; + _written = 0; + _write_some(); + while (_datasource && !_noblock) { + _send_waiting = true; + esp_yield(); + } + _send_waiting = false; + return _written; + } - if(_rx_buf) { - // there is some unread data - // chain the new pbuf to the existing one - DEBUGV(":rch %d, %d\r\n", _rx_buf->tot_len, pb->tot_len); - pbuf_cat(_rx_buf, pb); - } else { - DEBUGV(":rn %d\r\n", pb->tot_len); - _rx_buf = pb; - _rx_buf_offset = 0; - } - return ERR_OK; + + void _write_some() + { + if (!_datasource || !_pcb) { + return; } - void _error(err_t err) { - DEBUGV(":er %d %d %d\r\n", err, _size_sent, _send_waiting); - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - _pcb = NULL; - if(_size_sent && _send_waiting) { - esp_schedule(); + size_t left = _datasource->available(); + size_t can_send = tcp_sndbuf(_pcb); + if (_pcb->snd_queuelen >= TCP_SND_QUEUELEN) { + can_send = 0; + } + size_t will_send = (can_send < left) ? can_send : left; + if (will_send) { + const uint8_t* buf = _datasource->get_buffer(will_send); + err_t err = tcp_write(_pcb, buf, will_send, TCP_WRITE_FLAG_COPY); + _datasource->release_buffer(buf, will_send); + if (err == ERR_OK) { + _written += will_send; + tcp_output(_pcb); } } - err_t _poll(tcp_pcb* pcb) { - return ERR_OK; + if (!_datasource->available() || _noblock) { + delete _datasource; + _datasource = nullptr; } + } - static recv_ret_t _s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, err_t err) { - return reinterpret_cast<ClientContext*>(arg)->_recv(tpcb, pb, err); + void _write_some_from_cb() + { + _write_some(); + if (!_datasource && _send_waiting) { + esp_schedule(); } - - static void _s_error(void *arg, err_t err) { - reinterpret_cast<ClientContext*>(arg)->_error(err); + } + + err_t _sent(tcp_pcb* pcb, uint16_t len) + { + DEBUGV(":sent %d\r\n", len); + _write_some_from_cb(); + return ERR_OK; + } + + void _consume(size_t size) + { + ptrdiff_t left = _rx_buf->len - _rx_buf_offset - size; + if(left > 0) { + _rx_buf_offset += size; + } else if(!_rx_buf->next) { + DEBUGV(":c0 %d, %d\r\n", size, _rx_buf->tot_len); + if(_pcb) { + tcp_recved(_pcb, _rx_buf->len); + } + pbuf_free(_rx_buf); + _rx_buf = 0; + _rx_buf_offset = 0; + } else { + DEBUGV(":c %d, %d, %d\r\n", size, _rx_buf->len, _rx_buf->tot_len); + auto head = _rx_buf; + _rx_buf = _rx_buf->next; + _rx_buf_offset = 0; + pbuf_ref(_rx_buf); + if(_pcb) { + tcp_recved(_pcb, head->len); + } + pbuf_free(head); } - - static err_t _s_poll(void *arg, struct tcp_pcb *tpcb) { - return reinterpret_cast<ClientContext*>(arg)->_poll(tpcb); + } + + recv_ret_t _recv(tcp_pcb* pcb, pbuf* pb, err_t err) + { + if(pb == 0) { // connection closed + DEBUGV(":rcl\r\n"); + _cancel_write(); + abort(); + return ERR_ABRT; } - static err_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len) { - return reinterpret_cast<ClientContext*>(arg)->_sent(tpcb, len); + if(_rx_buf) { + DEBUGV(":rch %d, %d\r\n", _rx_buf->tot_len, pb->tot_len); + pbuf_cat(_rx_buf, pb); + } else { + DEBUGV(":rn %d\r\n", pb->tot_len); + _rx_buf = pb; + _rx_buf_offset = 0; } - - private: - tcp_pcb* _pcb; - - pbuf* _rx_buf; - size_t _rx_buf_offset; - - discard_cb_t _discard_cb; - void* _discard_cb_arg; - - int _refcnt; - ClientContext* _next; - - size_t _size_sent; - bool _send_waiting; + return ERR_OK; + } + + void _error(err_t err) + { + DEBUGV(":er %d %08x\r\n", err, (uint32_t) _datasource); + tcp_arg(_pcb, NULL); + tcp_sent(_pcb, NULL); + tcp_recv(_pcb, NULL); + tcp_err(_pcb, NULL); + _pcb = NULL; + _cancel_write(); + } + + err_t _poll(tcp_pcb*) + { + _write_some_from_cb(); + return ERR_OK; + } + + static recv_ret_t _s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, err_t err) + { + return reinterpret_cast<ClientContext*>(arg)->_recv(tpcb, pb, err); + } + + static void _s_error(void *arg, err_t err) + { + reinterpret_cast<ClientContext*>(arg)->_error(err); + } + + static err_t _s_poll(void *arg, struct tcp_pcb *tpcb) + { + return reinterpret_cast<ClientContext*>(arg)->_poll(tpcb); + } + + static err_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len) + { + return reinterpret_cast<ClientContext*>(arg)->_sent(tpcb, len); + } + +private: + tcp_pcb* _pcb; + + pbuf* _rx_buf; + size_t _rx_buf_offset; + + discard_cb_t _discard_cb; + void* _discard_cb_arg; + + int _refcnt; + ClientContext* _next; + + DataSource* _datasource = nullptr; + size_t _written = 0; + bool _noblock = false; + bool _send_waiting = false; }; #endif//CLIENTCONTEXT_H diff --git a/libraries/ESP8266WiFi/src/include/DataSource.h b/libraries/ESP8266WiFi/src/include/DataSource.h new file mode 100644 index 0000000000..5775bc2e7e --- /dev/null +++ b/libraries/ESP8266WiFi/src/include/DataSource.h @@ -0,0 +1,113 @@ +/* DataSource.h - a read-only object similar to Stream, but with less methods + * Copyright (c) 2016 Ivan Grokhotkov. All rights reserved. + * This file is distributed under MIT license. + */ +#ifndef DATASOURCE_H +#define DATASOURCE_H + +#include <assert.h> + +class DataSource { +public: + virtual ~DataSource() {} + virtual size_t available() = 0; + virtual const uint8_t* get_buffer(size_t size) = 0; + virtual void release_buffer(const uint8_t* buffer, size_t size) = 0; + +}; + +class BufferDataSource : public DataSource { +public: + BufferDataSource(const uint8_t* data, size_t size) : + _data(data), + _size(size) + { + } + + size_t available() override + { + return _size - _pos; + } + + const uint8_t* get_buffer(size_t size) override + { + assert(_pos + size <= _size); + return _data + _pos; + } + + void release_buffer(const uint8_t* buffer, size_t size) override + { + assert(buffer == _data + _pos); + _pos += size; + } + +protected: + const uint8_t* _data; + const size_t _size; + size_t _pos = 0; +}; + +template<typename TStream> +class BufferedStreamDataSource : public DataSource { +public: + BufferedStreamDataSource(TStream& stream, size_t size) : + _stream(stream), + _size(size) + { + } + + size_t available() override + { + return _size - _pos; + } + + const uint8_t* get_buffer(size_t size) override + { + assert(_pos + size <= _size); + if (_bufferSize < size) { + _buffer.reset(new uint8_t[size]); + _bufferSize = size; + } + size_t cb = _stream.readBytes(reinterpret_cast<char*>(_buffer.get()), size); + assert(cb == size); + return _buffer.get(); + } + + void release_buffer(const uint8_t* buffer, size_t size) override + { + _pos += size; + } + +protected: + TStream& _stream; + std::unique_ptr<uint8_t[]> _buffer; + size_t _size; + size_t _pos = 0; + size_t _bufferSize = 0; +}; + +class ProgmemStream +{ +public: + ProgmemStream(PGM_P buf, size_t size) : + _buf(buf), + _left(size) + { + } + + size_t readBytes(char* dst, size_t size) + { + size_t will_read = (_left < size) ? _left : size; + memcpy_P((void*)dst, (PGM_VOID_P)_buf, will_read); + _left -= will_read; + _buf += will_read; + return will_read; + } + +protected: + PGM_P _buf; + size_t _left; +}; + + +#endif //DATASOURCE_H