From cfd665298756283c633ad77e4e6737e2f54e2999 Mon Sep 17 00:00:00 2001 From: Amos Jeffries Date: Wed, 19 May 2010 23:28:21 +1200 Subject: [PATCH] Comm restructure part 2 - outbound connections --- src/CommCalls.cc | 5 +- src/CommCalls.h | 8 +- src/HttpRequest.cc | 11 +- src/Makefile.am | 17 +- src/PeerSelectState.h | 34 +- src/adaptation/icap/Xaction.cc | 55 +- src/cache_cf.cc | 2 +- src/client_side.cc | 48 +- src/client_side.h | 2 - src/comm.cc | 322 +---------- src/comm.h | 33 +- src/comm/ConnectStateData.cc | 179 ++++++ src/comm/ConnectStateData.h | 76 +++ src/comm/Connection.cc | 34 ++ src/{ConnectionDetail.h => comm/Connection.h} | 46 +- src/comm/ListenStateData.cc | 40 +- src/comm/ListenStateData.h | 8 +- src/comm/Makefile.am | 14 +- src/comm/comm_err_t.h | 21 + src/defines.h | 6 - src/dns_internal.cc | 45 +- src/forward.cc | 534 ++++++++---------- src/forward.h | 22 +- src/fqdncache.cc | 1 + src/ftp.cc | 46 +- src/gopher.cc | 11 +- src/http.cc | 9 +- src/ident/AclIdent.cc | 6 +- src/ident/Ident.cc | 89 ++- src/ident/Ident.h | 5 +- src/ipcache.cc | 5 +- src/main.cc | 2 +- src/neighbors.cc | 71 +-- src/peer_select.cc | 109 +++- src/protos.h | 6 +- src/structs.h | 2 +- src/tunnel.cc | 188 +++--- src/typedefs.h | 2 - src/whois.cc | 2 +- 39 files changed, 1075 insertions(+), 1041 deletions(-) create mode 100644 src/comm/ConnectStateData.cc create mode 100644 src/comm/ConnectStateData.h create mode 100644 src/comm/Connection.cc rename src/{ConnectionDetail.h => comm/Connection.h} (61%) create mode 100644 src/comm/comm_err_t.h diff --git a/src/CommCalls.cc b/src/CommCalls.cc index 3f7da9996bc..d4b10a652b8 100644 --- a/src/CommCalls.cc +++ b/src/CommCalls.cc @@ -71,7 +71,6 @@ void CommConnectCbParams::print(std::ostream &os) const { CommCommonCbParams::print(os); - os << ", " << dns; } /* CommIoCbParams */ @@ -133,7 +132,7 @@ CommAcceptCbPtrFun::CommAcceptCbPtrFun(IOACB *aHandler, void CommAcceptCbPtrFun::dial() { - handler(params.fd, params.nfd, ¶ms.details, params.flag, params.xerrno, params.data); + handler(params.fd, params.nfd, params.details, params.flag, params.xerrno, params.data); } void @@ -157,7 +156,7 @@ CommConnectCbPtrFun::CommConnectCbPtrFun(CNCB *aHandler, void CommConnectCbPtrFun::dial() { - handler(params.fd, params.dns, params.flag, params.xerrno, params.data); + handler(params.conn, params.paths, params.flag, params.xerrno, params.data); } void diff --git a/src/CommCalls.h b/src/CommCalls.h index c5b39411a18..d732a7f153d 100644 --- a/src/CommCalls.h +++ b/src/CommCalls.h @@ -7,8 +7,7 @@ #define SQUID_COMMCALLS_H #include "comm.h" -#include "ConnectionDetail.h" -#include "DnsLookupDetails.h" +#include "comm/Connection.h" #include "base/AsyncCall.h" #include "base/AsyncJobCalls.h" @@ -69,7 +68,7 @@ class CommAcceptCbParams: public CommCommonCbParams void print(std::ostream &os) const; public: - ConnectionDetail details; + Comm::Connection *details; int nfd; // TODO: rename to fdNew or somesuch }; @@ -84,7 +83,8 @@ class CommConnectCbParams: public CommCommonCbParams void print(std::ostream &os) const; public: - DnsLookupDetails dns; + Comm::Connection *conn; + Vector *paths; }; // read/write (I/O) parameters diff --git a/src/HttpRequest.cc b/src/HttpRequest.cc index 40ccc86fe96..55004d6b8e2 100644 --- a/src/HttpRequest.cc +++ b/src/HttpRequest.cc @@ -35,15 +35,16 @@ */ #include "squid.h" -#include "HttpRequest.h" +#include "acl/FilledChecklist.h" +#if ICAP_CLIENT +#include "adaptation/icap/icap_log.h" +#endif #include "auth/UserRequest.h" +#include "DnsLookupDetails.h" +#include "HttpRequest.h" #include "HttpHeaderRange.h" #include "MemBuf.h" #include "Store.h" -#if ICAP_CLIENT -#include "adaptation/icap/icap_log.h" -#endif -#include "acl/FilledChecklist.h" HttpRequest::HttpRequest() : HttpMsg(hoRequest) { diff --git a/src/Makefile.am b/src/Makefile.am index af7b0ddbf64..a5df1f1756c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -289,7 +289,6 @@ squid_SOURCES = \ ConfigOption.cc \ ConfigParser.cc \ ConfigParser.h \ - ConnectionDetail.h \ debug.cc \ Debug.h \ defines.h \ @@ -532,7 +531,7 @@ nodist_squid_SOURCES = \ squid_LDADD = \ $(COMMON_LIBS) \ - comm/libcomm-listener.la \ + comm/libcomm.la \ eui/libeui.la \ icmp/libicmp.la icmp/libicmp-core.la \ log/liblog.la \ @@ -1234,10 +1233,10 @@ tests_testCacheManager_SOURCES = \ wordlist.cc nodist_tests_testCacheManager_SOURCES = \ $(BUILT_SOURCES) -# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead. +# comm.cc only requires comm/libcomm.la until fdc_table is dead. tests_testCacheManager_LDADD = \ $(COMMON_LIBS) \ - comm/libcomm-listener.la \ + comm/libcomm.la \ icmp/libicmp.la icmp/libicmp-core.la \ log/liblog.la \ $(REPL_OBJS) \ @@ -1419,7 +1418,7 @@ nodist_tests_testEvent_SOURCES = \ tests_testEvent_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1573,7 +1572,7 @@ nodist_tests_testEventLoop_SOURCES = \ tests_testEventLoop_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1722,7 +1721,7 @@ nodist_tests_test_http_range_SOURCES = \ tests_test_http_range_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -1876,7 +1875,7 @@ nodist_tests_testHttpRequest_SOURCES = \ tests_testHttpRequest_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REPL_OBJS) \ ${ADAPTATION_LIBS} \ @@ -2247,7 +2246,7 @@ nodist_tests_testURL_SOURCES = \ tests_testURL_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ - comm/libcomm-listener.la \ + comm/libcomm.la \ log/liblog.la \ $(REGEXLIB) \ $(REPL_OBJS) \ diff --git a/src/PeerSelectState.h b/src/PeerSelectState.h index 49e7386c763..6896a158255 100644 --- a/src/PeerSelectState.h +++ b/src/PeerSelectState.h @@ -33,9 +33,36 @@ #ifndef SQUID_PEERSELECTSTATE_H #define SQUID_PEERSELECTSTATE_H +#include "Array.h" #include "cbdata.h" -#include "PingData.h" +#include "comm/Connection.h" #include "ip/Address.h" +#include "PingData.h" + +class HttpRequest; +class StoreEntry; + +typedef void PSC(Vector *, void *); + +SQUIDCEXTERN void peerSelect(Vector *, HttpRequest *, StoreEntry *, PSC *, void *data); +SQUIDCEXTERN void peerSelectInit(void); + +/** + * A peer which has been selected as a possible destination. + * Listed as pointers here so as to prevent duplicates being added but will + * be converted to a set of IP address path options before handing back out + * to the caller. + * + * Certain connection flags and outgoing settings will also be looked up and + * set based on the received request and peer settings before handing back. + */ +class FwdServer +{ +public: + peer *_peer; /* NULL --> origin server */ + hier_code code; + FwdServer *next; +}; class ps_state { @@ -50,7 +77,10 @@ class ps_state int direct; PSC *callback; void *callback_data; - FwdServer *servers; + + Vector *paths; ///< the callers paths array. to be filled with our final results. + FwdServer *servers; ///< temporary linked list of peers we will pass back. + /* * Why are these Ip::Address instead of peer *? Because a * peer structure can become invalid during the peer selection diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc index 6fbeb1fcc18..6356d052f2c 100644 --- a/src/adaptation/icap/Xaction.cc +++ b/src/adaptation/icap/Xaction.cc @@ -4,6 +4,7 @@ #include "squid.h" #include "comm.h" +#include "comm/ConnectStateData.h" #include "CommCalls.h" #include "HttpMsg.h" #include "adaptation/icap/Xaction.h" @@ -115,32 +116,21 @@ void Adaptation::Icap::Xaction::openConnection() disableRetries(); // we only retry pconn failures - Ip::Address outgoing; - connection = comm_open(SOCK_STREAM, 0, outgoing, - COMM_NONBLOCKING, s.cfg().uri.termedBuf()); + Comm::Connection *conn = new Comm::Connection; - if (connection < 0) - dieOnConnectionFailure(); // throws - - debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port); - - // TODO: service bypass status may differ from that of a transaction - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", - TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); - - commSetTimeout(connection, TheConfig.connect_timeout( - service().cfg().bypass), timeoutCall); - - typedef CommCbMemFunT CloseDialer; - closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", - CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); - comm_add_close_handler(connection, closer); + // TODO: where do we get the DNS info for the ICAP server host ?? + // Ip::Address will do a BLOCKING lookup if s.cfg().host is a hostname + conn->remote = s.cfg().host.termedBuf(); + conn->remote.SetPort(s.cfg().port); typedef CommCbMemFunT ConnectDialer; connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected)); - commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector); + + ConnectStateData *cs = new ConnectStateData(conn, connector); + cs->host = xstrdup(s.cfg().host.termedBuf()); + cs->connect_timeout = TheConfig.connect_timeout(service().cfg().bypass); + cs->connect(); } /* @@ -200,14 +190,35 @@ void Adaptation::Icap::Xaction::closeConnection() // connection with the ICAP service established void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) { + if (io.flag == COMM_TIMEOUT) { + handleCommTimedout(); + return; + } + Must(connector != NULL); connector = NULL; if (io.flag != COMM_OK) dieOnConnectionFailure(); // throws - fd_table[connection].noteUse(icapPconnPool); + // TODO: do we still need the timeout handler set? + // there was no mention of un-setting it on success. + + // TODO: service bypass status may differ from that of a transaction + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", + TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); + + commSetTimeout(io.conn->fd, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); + + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); + comm_add_close_handler(io.conn->fd, closer); + + fd_table[io.conn->fd].noteUse(icapPconnPool); + connection = io.conn->fd; // TODO: maybe store the full Comm::Connection object handleCommConnected(); } diff --git a/src/cache_cf.cc b/src/cache_cf.cc index 2f8a1c1f1d5..a151b739f2d 100644 --- a/src/cache_cf.cc +++ b/src/cache_cf.cc @@ -1922,7 +1922,7 @@ parse_peer(peer ** head) p->icp.version = ICP_VERSION_CURRENT; - p->test_fd = -1; + p->testing_now = false; #if USE_CACHE_DIGESTS diff --git a/src/client_side.cc b/src/client_side.cc index 6059e220088..7a53da8454a 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -92,8 +92,8 @@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "comm/Connection.h" #include "comm/ListenStateData.h" -#include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" #include "HttpHdrContRange.h" @@ -3057,7 +3057,7 @@ connStateCreate(const Ip::Address &peer, const Ip::Address &me, int fd, http_por /** Handle a new connection on HTTP socket. */ void -httpAccept(int sock, int newfd, ConnectionDetail *details, +httpAccept(int sock, int newfd, Comm::Connection *details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; @@ -3070,7 +3070,7 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, debugs(33, 4, "httpAccept: FD " << newfd << ": accepted"); fd_note(newfd, "client http connect"); - connState = connStateCreate(&details->peer, &details->me, newfd, s); + connState = connStateCreate(&details->remote, &details->local, newfd, s); typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed", @@ -3078,7 +3078,7 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, comm_add_close_handler(newfd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout", @@ -3088,19 +3088,19 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, #if USE_IDENT if (Ident::TheConfig.identLookup) { ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL); - identChecklist.src_addr = details->peer; - identChecklist.my_addr = details->me; + identChecklist.src_addr = details->remote; + identChecklist.my_addr = details->local; if (identChecklist.fastCheck()) - Ident::Start(details->me, details->peer, clientIdentDone, connState); + Ident::Start(details, clientIdentDone, connState); } #endif #if USE_SQUID_EUI if (Eui::TheConfig.euiLookup) { - if (details->peer.IsIPv4()) { - connState->peer_eui48.lookup(details->peer); - } else if (details->peer.IsIPv6()) { - connState->peer_eui64.lookup(details->peer); + if (details->remote.IsIPv4()) { + connState->peer_eui48.lookup(details->remote); + } else if (details->remote.IsIPv6()) { + connState->peer_eui64.lookup(details->remote); } } #endif @@ -3111,7 +3111,7 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, connState->readSomeData(); - clientdbEstablished(details->peer, 1); + clientdbEstablished(details->remote, 1); incoming_sockets_accepted++; } @@ -3120,7 +3120,7 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, /** Create SSL connection structure and update fd_table */ static SSL * -httpsCreate(int newfd, ConnectionDetail *details, SSL_CTX *sslContext) +httpsCreate(int newfd, Comm::Connection *details, SSL_CTX *sslContext) { SSL *ssl = SSL_new(sslContext); @@ -3263,7 +3263,7 @@ clientNegotiateSSL(int fd, void *data) /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, +httpsAccept(int sock, int newfd, Comm::Connection *details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; @@ -3281,7 +3281,7 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation."); fd_note(newfd, "client https connect"); - ConnStateData *connState = connStateCreate(details->peer, details->me, + ConnStateData *connState = connStateCreate(details->remote, details->local, newfd, &s->http); typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = asyncCall(33, 5, "ConnStateData::connStateClosed", @@ -3289,7 +3289,7 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, comm_add_close_handler(newfd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = asyncCall(33, 5, "ConnStateData::requestTimeout", @@ -3299,10 +3299,10 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, #if USE_IDENT if (Ident::TheConfig.identLookup) { ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL); - identChecklist.src_addr = details->peer; - identChecklist.my_addr = details->me; + identChecklist.src_addr = details->remote; + identChecklist.my_addr = details->local; if (identChecklist.fastCheck()) - Ident::Start(details->me, details->peer, clientIdentDone, connState); + Ident::Start(details, clientIdentDone, connState); } #endif @@ -3312,7 +3312,7 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, commSetSelect(newfd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - clientdbEstablished(details->peer, 1); + clientdbEstablished(details->remote, 1); incoming_sockets_accepted++; } @@ -3329,10 +3329,10 @@ ConnStateData::switchToHttps() debugs(33, 5, HERE << "converting FD " << fd << " to SSL"); - // fake a ConnectionDetail object; XXX: make ConnState a ConnectionDetail? - ConnectionDetail detail; - detail.me = me; - detail.peer = peer; + // fake a Comm::Connection object; XXX: make ConnState a Comm::Connection? + Comm::Connection detail; + detail.local = me; + detail.remote = peer; SSL_CTX *sslContext = port->sslContext; SSL *ssl = NULL; diff --git a/src/client_side.h b/src/client_side.h index 4162562ab3d..7f80760ba4c 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -128,8 +128,6 @@ class ClientSocketContext : public RefCountable }; -class ConnectionDetail; - /** A connection to a socket */ class ConnStateData : public BodyProducer/*, public RefCountable*/ { diff --git a/src/comm.cc b/src/comm.cc index 00026672d52..c6536bac4cc 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -33,16 +33,17 @@ */ #include "squid.h" +#include "base/AsyncCall.h" #include "StoreIOBuffer.h" #include "comm.h" #include "event.h" #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" +#include "comm/Connection.h" #include "comm/ListenStateData.h" #include "CommIO.h" #include "CommRead.h" -#include "ConnectionDetail.h" #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" @@ -195,38 +196,6 @@ commio_call_callback(comm_io_callback_t *ccb) { } -class ConnectStateData -{ - -public: - void *operator new (size_t); - void operator delete (void *); - static void Connect (int fd, void *me); - void connect(); - void callCallback(comm_err_t status, int xerrno); - void defaults(); - -// defaults given by client - char *host; - u_short default_port; - Ip::Address default_addr; - // NP: CANNOT store the default addr:port together as it gets set/reset differently. - - DnsLookupDetails dns; ///< host lookup details - Ip::Address S; - AsyncCall::Pointer callback; - - int fd; - int tries; - int addrcount; - int connstart; - -private: - int commResetFD(); - int commRetryConnect(); - CBDATA_CLASS(ConnectStateData); -}; - /* STATIC */ static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs @@ -241,7 +210,6 @@ static void commSetNoLinger(int); static void commSetTcpNoDelay(int); #endif static void commSetTcpRcvbuf(int, int); -static PF commConnectFree; static PF commHandleWrite; static IPH commConnectDnsHandle; @@ -818,23 +786,10 @@ comm_openex(int sock_type, return new_socket; } -CBDATA_CLASS_INIT(ConnectStateData); - -void * -ConnectStateData::operator new (size_t size) -{ - CBDATA_INIT_TYPE(ConnectStateData); - return cbdataAlloc(ConnectStateData); -} - -void -ConnectStateData::operator delete (void *address) -{ - cbdataFree(address); -} - - - +#if 0 +// AYJ: this API is dead. alter the caller which is using this to do its own DNS lookups +// and generate a Vector of possible destinations. +// do the rest of this itself... void commConnectStart(int fd, const char *host, u_short port, AsyncCall::Pointer &cb) { @@ -848,10 +803,12 @@ commConnectStart(int fd, const char *host, u_short port, AsyncCall::Pointer &cb) cs->default_port = port; cs->callback = cb; - comm_add_close_handler(fd, commConnectFree, cs); + comm_add_close_handler(fd, ConnectStateData::Free, cs); ipcache_nbgethostbyname(host, commConnectDnsHandle, cs); } +#endif +#if 0 // TODO: Remove this and similar callback registration functions by replacing // (callback,data) parameters with an AsyncCall so that we do not have to use // a generic call name and debug level when creating an AsyncCall. This will @@ -864,63 +821,7 @@ commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void * "SomeCommConnectHandler", CommConnectCbPtrFun(callback, data)); commConnectStart(fd, host, port, call); } - -static void -commConnectDnsHandle(const ipcache_addrs *ia, const DnsLookupDetails &details, void *data) -{ - ConnectStateData *cs = (ConnectStateData *)data; - cs->dns = details; - - if (ia == NULL) { - debugs(5, 3, "commConnectDnsHandle: Unknown host: " << cs->host); - cs->callCallback(COMM_ERR_DNS, 0); - return; - } - - assert(ia->cur < ia->count); - - cs->default_addr = ia->in_addrs[ia->cur]; - - if (Config.onoff.balance_on_multiple_ip) - ipcacheCycleAddr(cs->host, NULL); - - cs->addrcount = ia->count; - - cs->connstart = squid_curtime; - - cs->connect(); -} - -void -ConnectStateData::callCallback(comm_err_t status, int xerrno) -{ - debugs(5, 3, "commConnectCallback: FD " << fd); - - comm_remove_close_handler(fd, commConnectFree, this); - commSetTimeout(fd, -1, NULL, NULL); - - typedef CommConnectCbParams Params; - Params ¶ms = GetCommParams(callback); - params.fd = fd; - params.dns = dns; - params.flag = status; - params.xerrno = xerrno; - ScheduleCallHere(callback); - callback = NULL; - - commConnectFree(fd, this); -} - -static void -commConnectFree(int fd, void *data) -{ - ConnectStateData *cs = (ConnectStateData *)data; - debugs(5, 3, "commConnectFree: FD " << fd); -// delete cs->callback; - cs->callback = NULL; - safe_free(cs->host); - delete cs; -} +#endif static void copyFDFlags(int to, fde *F) @@ -942,192 +843,6 @@ copyFDFlags(int to, fde *F) commSetTcpRcvbuf(to, Config.tcpRcvBufsz); } -/* Reset FD so that we can connect() again */ -int -ConnectStateData::commResetFD() -{ - struct addrinfo *AI = NULL; - Ip::Address nul; - int new_family = AF_UNSPEC; - -// XXX: do we have to check this? -// -// if (!cbdataReferenceValid(callback.data)) -// return 0; - - statCounter.syscalls.sock.sockets++; - - /* setup a bare-bones addrinfo */ - /* TODO INET6: for WinXP we may need to check the local_addr type and setup the family properly. */ - nul.GetAddrInfo(AI); - new_family = AI->ai_family; - - int fd2 = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol); - - nul.FreeAddrInfo(AI); - - if (fd2 < 0) { - debugs(5, DBG_CRITICAL, HERE << "WARNING: FD " << fd2 << " socket failed to allocate: " << xstrerror()); - - if (ENFILE == errno || EMFILE == errno) - fdAdjustReserved(); - - return 0; - } - -#ifdef _SQUID_MSWIN_ - - /* On Windows dup2() can't work correctly on Sockets, the */ - /* workaround is to close the destination Socket before call them. */ - close(fd); - -#endif - - if (dup2(fd2, fd) < 0) { - debugs(5, DBG_CRITICAL, HERE << "WARNING: dup2(FD " << fd2 << ", FD " << fd << ") failed: " << xstrerror()); - - if (ENFILE == errno || EMFILE == errno) - fdAdjustReserved(); - - close(fd2); - - return 0; - } - commResetSelect(fd); - - close(fd2); - fde *F = &fd_table[fd]; - - debugs(50, 3, "commResetFD: Reset socket FD " << fd << "->" << fd2 << " : family=" << new_family ); - - /* INET6: copy the new sockets family type to the FDE table */ - fd_table[fd].sock_family = new_family; - - fd_table[fd].flags.called_connect = 0; - /* - * yuck, this has assumptions about comm_open() arguments for - * the original socket - */ - - /* MUST be done before binding or face OS Error: "(99) Cannot assign requested address"... */ - if ( F->flags.transparent ) { - comm_set_transparent(fd); - } - - AI = NULL; - F->local_addr.GetAddrInfo(AI); - - if (commBind(fd, *AI) != COMM_OK) { - debugs(5, DBG_CRITICAL, "WARNING: Reset of FD " << fd << " for " << F->local_addr << " failed to bind: " << xstrerror()); - F->local_addr.FreeAddrInfo(AI); - return 0; - } - F->local_addr.FreeAddrInfo(AI); - - if (F->tos) - comm_set_tos(fd, F->tos); - -#if IPV6_SPECIAL_SPLITSTACK - if ( F->local_addr.IsIPv6() ) - comm_set_v6only(fd, 1); -#endif - - copyFDFlags(fd, F); - - return 1; -} - -int -ConnectStateData::commRetryConnect() -{ - assert(addrcount > 0); - - if (addrcount == 1) { - if (tries >= Config.retry.maxtries) - return 0; - - if (squid_curtime - connstart > Config.Timeout.connect) - return 0; - } else { - if (tries > addrcount) - return 0; - } - - return commResetFD(); -} - -static void -commReconnect(void *data) -{ - ConnectStateData *cs = (ConnectStateData *)data; - ipcache_nbgethostbyname(cs->host, commConnectDnsHandle, cs); -} - -/** Connect SOCK to specified DEST_PORT at DEST_HOST. */ -void -ConnectStateData::Connect(int fd, void *me) -{ - ConnectStateData *cs = (ConnectStateData *)me; - assert (cs->fd == fd); - cs->connect(); -} - -void -ConnectStateData::defaults() -{ - S = default_addr; - S.SetPort(default_port); -} - -void -ConnectStateData::connect() -{ - if (S.IsAnyAddr()) - defaults(); - - debugs(5,5, HERE << "to " << S); - - switch (comm_connect_addr(fd, S) ) { - - case COMM_INPROGRESS: - debugs(5, 5, HERE << "FD " << fd << ": COMM_INPROGRESS"); - commSetSelect(fd, COMM_SELECT_WRITE, ConnectStateData::Connect, this, 0); - break; - - case COMM_OK: - debugs(5, 5, HERE << "FD " << fd << ": COMM_OK - connected"); - ipcacheMarkGoodAddr(host, S); - callCallback(COMM_OK, 0); - break; - -#if USE_IPV6 - case COMM_ERR_PROTOCOL: - /* problem using the desired protocol over this socket. - * count the connection attempt, reset the socket, and immediately try again */ - tries++; - commResetFD(); - connect(); - break; -#endif - - default: - debugs(5, 5, HERE "FD " << fd << ": * - try again"); - tries++; - ipcacheMarkBadAddr(host, S); - -#if USE_ICMP - if (Config.onoff.test_reachability) - netdbDeleteAddrNetwork(S); -#endif - - if (commRetryConnect()) { - eventAdd("commReconnect", commReconnect, this, this->addrcount == 1 ? 0.05 : 0.0, 0); - } else { - debugs(5, 5, HERE << "FD " << fd << ": * - ERR tried too many times already."); - callCallback(COMM_ERR_CONNECT, errno); - } - } -} /* int commSetTimeout_old(int fd, int timeout, PF * handler, void *data) @@ -1169,7 +884,8 @@ commSetTimeout(int fd, int timeout, PF * handler, void *data) } -int commSetTimeout(int fd, int timeout, AsyncCall::Pointer &callback) +int +commSetTimeout(int fd, int timeout, AsyncCall::Pointer &callback) { debugs(5, 3, HERE << "FD " << fd << " timeout " << timeout); assert(fd >= 0); @@ -1465,6 +1181,16 @@ comm_close_complete(int fd, void *data) Comm::AcceptLimiter::Instance().kick(); } +/* + * Close the socket fd in use by a connection. + */ +void +_comm_close(Comm::Connection *conn, char const *file, int line) +{ + _comm_close(conn->fd, file, line); + conn->fd = -1; +} + /* * Close the socket fd. * @@ -2374,10 +2100,6 @@ DeferredRead::markCancelled() cancelled = true; } -ConnectionDetail::ConnectionDetail() : me(), peer() -{ -} - int CommSelectEngine::checkEvents(int timeout) { diff --git a/src/comm.h b/src/comm.h index 426b9af6bf1..f14c3a755df 100644 --- a/src/comm.h +++ b/src/comm.h @@ -2,34 +2,18 @@ #define __COMM_H__ #include "squid.h" +#include "Array.h" #include "AsyncEngine.h" #include "base/AsyncCall.h" -#include "StoreIOBuffer.h" -#include "Array.h" +#include "comm/comm_err_t.h" +#include "comm/Connection.h" #include "ip/Address.h" +#include "StoreIOBuffer.h" #define COMMIO_FD_READCB(fd) (&commfd_table[(fd)].readcb) #define COMMIO_FD_WRITECB(fd) (&commfd_table[(fd)].writecb) -typedef enum { - COMM_OK = 0, - COMM_ERROR = -1, - COMM_NOMESSAGE = -3, - COMM_TIMEOUT = -4, - COMM_SHUTDOWN = -5, - COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */ - COMM_INPROGRESS = -7, - COMM_ERR_CONNECT = -8, - COMM_ERR_DNS = -9, - COMM_ERR_CLOSING = -10, -#if USE_IPV6 - COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ -#endif - COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ -} comm_err_t; - -class DnsLookupDetails; -typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data); +typedef void CNCB(Comm::Connection *conn, Vector *paths, comm_err_t status, int xerrno, void *data); typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); @@ -42,7 +26,8 @@ SQUIDCEXTERN int commUnsetNonBlocking(int fd); SQUIDCEXTERN void commSetCloseOnExec(int fd); SQUIDCEXTERN void commSetTcpKeepalive(int fd, int idle, int interval, int timeout); extern void _comm_close(int fd, char const *file, int line); -#define comm_close(fd) (_comm_close((fd), __FILE__, __LINE__)) +extern void _comm_close(Comm::Connection *conn, char const *file, int line); +#define comm_close(x) (_comm_close((x), __FILE__, __LINE__)) SQUIDCEXTERN void comm_reset_close(int fd); #if LINGERING_CLOSE SQUIDCEXTERN void comm_lingering_close(int fd); @@ -100,8 +85,8 @@ SQUIDCEXTERN void comm_select_init(void); SQUIDCEXTERN comm_err_t comm_select(int); SQUIDCEXTERN void comm_quick_poll_required(void); -class ConnectionDetail; -typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data); +#include "comm/Connection.h" +typedef void IOACB(int fd, int nfd, Comm::Connection *details, comm_err_t flag, int xerrno, void *data); extern void comm_add_close_handler(int fd, PF *, void *); extern void comm_add_close_handler(int fd, AsyncCall::Pointer &); extern void comm_remove_close_handler(int fd, PF *, void *); diff --git a/src/comm/ConnectStateData.cc b/src/comm/ConnectStateData.cc new file mode 100644 index 00000000000..7adfd5d096e --- /dev/null +++ b/src/comm/ConnectStateData.cc @@ -0,0 +1,179 @@ +#include "config.h" +#include "comm/ConnectStateData.h" +#include "comm.h" +#include "CommCalls.h" +#include "icmp/net_db.h" +#include "SquidTime.h" + +CBDATA_CLASS_INIT(ConnectStateData); + +ConnectStateData::ConnectStateData(Vector *paths, AsyncCall::Pointer handler) : + host(NULL), + connect_timeout(Config.Timeout.connect), + paths(paths), + solo(NULL), + callback(handler), + total_tries(0), + fail_retries(0), + connstart(0) +{} + +ConnectStateData::ConnectStateData(Comm::Connection *c, AsyncCall::Pointer handler) : + host(NULL), + connect_timeout(Config.Timeout.connect), + paths(paths), + solo(c), + callback(handler), + total_tries(0), + fail_retries(0), + connstart(0) +{} + +void * +ConnectStateData::operator new(size_t size) +{ + CBDATA_INIT_TYPE(ConnectStateData); + return cbdataAlloc(ConnectStateData); +} + +void +ConnectStateData::operator delete(void *address) +{ + cbdataFree(address); +} + +void +ConnectStateData::callCallback(comm_err_t status, int xerrno) +{ + assert(paths != NULL); + + int fd = -1; + if (paths->size() > 0) { + fd = (*paths)[0]->fd; + debugs(5, 3, HERE << "FD " << fd); + comm_remove_close_handler(fd, ConnectStateData::EarlyAbort, this); + commSetTimeout(fd, -1, NULL, NULL); + } + + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams(callback); + if (solo != NULL) { + params.conn = solo; + } else { + params.paths = paths; + if (paths->size() > 0) + params.conn = (*paths)[0]; + } + params.flag = status; + params.xerrno = xerrno; + ScheduleCallHere(callback); + + callback = NULL; + safe_free(host); + delete this; +} + +void +ConnectStateData::connect() +{ + Comm::Connection *active; + + /* handle connecting to one single path */ + /* mainly used by components other than forwarding */ + + /* handle connecting to one of multiple paths */ + /* mainly used by forwarding */ + + if (solo) { + active = solo; + } else if (paths) { + Vector::iterator i = paths->begin(); + + if (connstart == 0) { + connstart = squid_curtime; + } + + /* find some socket we can use. will also bind the local address to it if needed. */ + while(paths->size() > 0 && (*i)->fd <= 0) { + (*i)->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, (*i)->local, (*i)->flags, (*i)->tos, host); + if ((*i)->fd <= 0) { + debugs(5 , 2, HERE << "Unable to connect " << (*i)->local << " -> " << (*i)->remote << " for " << host); + paths->shift(); + i = paths->begin(); + } + // else success will terminate the loop with: i->fd >0 + } + + /* we have nowhere left to try connecting */ + if (paths->size() < 1) { + callCallback(COMM_ERR_CONNECT, 0); + return; + } + + active = (*i); + } + + total_tries++; + + switch (comm_connect_addr(active->fd, active->remote) ) { + + case COMM_INPROGRESS: + debugs(5, 5, HERE << "FD " << active->fd << ": COMM_INPROGRESS"); + commSetSelect(active->fd, COMM_SELECT_WRITE, ConnectStateData::ConnectRetry, this, 0); + break; + + case COMM_OK: + debugs(5, 5, HERE << "FD " << active->fd << ": COMM_OK - connected"); + + /* + * stats.conn_open is used to account for the number of + * connections that we have open to the peer, so we can limit + * based on the max-conn option. We need to increment here, + * even if the connection may fail. + */ + if (active->_peer) + active->_peer->stats.conn_open++; + + ipcacheMarkGoodAddr(host, active->remote); + callCallback(COMM_OK, 0); + break; + + default: + debugs(5, 5, HERE "FD " << active->fd << ": * - try again"); + fail_retries++; + ipcacheMarkBadAddr(host, active->remote); + +#if USE_ICMP + if (Config.onoff.test_reachability) + netdbDeleteAddrNetwork(active->remote); +#endif + + // TODO: do the re-try logic with some sane bounds for handling many paths and retries. + if (fail_retries < Config.retry.maxtries) + eventAdd("ConnectStateData::Connect", ConnectStateData::Connect, this, 0.5, 0); + else if(squid_curtime - connstart > connect_timeout) { + debugs(5, 5, HERE << "FD " << active->fd << ": * - ERR took too long already."); + callCallback(COMM_TIMEOUT, errno); + } else if (paths && paths->size() > 0) { + paths->shift(); + fail_retries = 0; + eventAdd("ConnectStateData::Connect", ConnectStateData::Connect, this, 0.0, 0); + } else { + debugs(5, 5, HERE << "FD " << active->fd << ": * - ERR tried too many times already."); + callCallback(COMM_ERR_CONNECT, errno); + } + } +} + +void +ConnectStateData::EarlyAbort(int fd, void *data) +{ + ConnectStateData *cs = static_cast(data); + debugs(5, 3, HERE << "FD " << fd); + cs->callCallback(COMM_ERR_CLOSING, errno); // NP: is closing or shutdown better? + + /* TODO split cases: + * remote end rejecting the connection is normal and one of the other paths may be taken. + * squid shutting down or forcing abort on the connection attempt(s) are the only real fatal cases. + */ +} diff --git a/src/comm/ConnectStateData.h b/src/comm/ConnectStateData.h new file mode 100644 index 00000000000..eece8d15f04 --- /dev/null +++ b/src/comm/ConnectStateData.h @@ -0,0 +1,76 @@ +#ifndef _SQUID_SRC_COMM_CONNECTSTATEDATA_H +#define _SQUID_SRC_COMM_CONNECTSTATEDATA_H + +#include "Array.h" +#include "base/AsyncCall.h" +#include "cbdata.h" +#include "comm/comm_err_t.h" +#include "comm/Connection.h" + +/** + * State engine handling the opening of a remote outbound connection + * to one of multiple destinations. + * + * Create with a list of possible links and a handler callback to call when connected. + */ +class ConnectStateData +{ +public: + /** open first working of a set of connections */ + ConnectStateData(Vector *paths, AsyncCall::Pointer handler); + /** attempt to open one connection. */ + ConnectStateData(Comm::Connection *, AsyncCall::Pointer handler); + + void *operator new(size_t); + void operator delete(void *); + + /** + * Wrapper to start the connection attempts happening. + */ + static void Connect(void *data) { + ConnectStateData *cs = static_cast(data); + cs->connect(); + }; + static void ConnectRetry(int fd, void *data) { + ConnectStateData *cs = static_cast(data); + cs->connect(); + }; + + /** + * Temporary close handler used during connect. + * Handles the case(s) when a partially setup connection gets closed early. + */ + static void EarlyAbort(int fd, void *data); + + /** + * Actual connect start function. + */ + void connect(); + + /** + * Connection attempt are completed. One way or the other. + * Pass the results back to the external handler. + */ + void callCallback(comm_err_t status, int xerrno); + + char *host; ///< domain name we are trying to connect to. + + /** + * time at which to abandone the connection. + * the connection-done callback will be passed COMM_TIMEOUT + */ + time_t connect_timeout; + +private: + Vector *paths; ///< forwarding paths to be tried. front of the list is the current being opened. + Comm::Connection *solo; ///< single connection currently being opened. + AsyncCall::Pointer callback; ///< handler to be called on connection completion. + + int total_tries; ///< total number of connection attempts over all destinations so far. + int fail_retries; ///< number of retries current destination has been tried. + time_t connstart; ///< time at which this series of connection attempts was started. + + CBDATA_CLASS(ConnectStateData); +}; + +#endif /* _SQUID_SRC_COMM_CONNECTSTATEDATA_H */ diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc new file mode 100644 index 00000000000..3306c495173 --- /dev/null +++ b/src/comm/Connection.cc @@ -0,0 +1,34 @@ +#include "config.h" +#include "cbdata.h" +#include "comm.h" +#include "comm/Connection.h" + +Comm::Connection::Connection() : + local(), + remote(), + _peer(NULL), + peer_type(HIER_NONE), + fd(-1), + tos(0), + flags(COMM_NONBLOCKING) +{} + +Comm::Connection::Connection(Comm::Connection &c) : + local(c.local), + remote(c.remote), + _peer(c._peer), + peer_type(c.peer_type), + fd(c.fd), + tos(c.tos), + flags(c.flags) +{} + +Comm::Connection::~Connection() +{ + if (fd >= 0) { + comm_close(fd); + } + if (_peer) { + cbdataReferenceDone(_peer); + } +} diff --git a/src/ConnectionDetail.h b/src/comm/Connection.h similarity index 61% rename from src/ConnectionDetail.h rename to src/comm/Connection.h index 393fb2aebd2..4b9600c7619 100644 --- a/src/ConnectionDetail.h +++ b/src/comm/Connection.h @@ -35,18 +35,54 @@ #ifndef _SQUIDCONNECTIONDETAIL_H_ #define _SQUIDCONNECTIONDETAIL_H_ +#include "hier_code.h" #include "ip/Address.h" -class ConnectionDetail -{ +class peer; + +namespace Comm { + +/** COMM flags */ +/* TODO: make these a struct of boolean flags instead of a bitmap. */ +#define COMM_UNSET 0x00 +#define COMM_NONBLOCKING 0x01 +#define COMM_NOCLOEXEC 0x02 +#define COMM_REUSEADDR 0x04 +#define COMM_TRANSPARENT 0x08 +#define COMM_DOBIND 0x10 +class Connection +{ public: + Connection(); + Connection(Connection &c); + ~Connection(); + + /** Address/Port for the Squid end of a TCP link. */ + Ip::Address local; - ConnectionDetail(); + /** Address for the Remote end of a TCP link. */ + Ip::Address remote; - Ip::Address me; + /** cache_peer data object (if any) */ + peer *_peer; - Ip::Address peer; + /** Hierarchy code for this connection link */ + hier_code peer_type; + + /** + * Socket used by this connection. + * -1 if no socket has been opened. + */ + int fd; + + /** Quality of Service TOS values curtrently sent on this connection */ + int tos; + + /** COMM flags set on this connection */ + int flags; }; +}; // namespace Comm + #endif diff --git a/src/comm/ListenStateData.cc b/src/comm/ListenStateData.cc index 8f6e481d4f9..75222b60c93 100644 --- a/src/comm/ListenStateData.cc +++ b/src/comm/ListenStateData.cc @@ -35,9 +35,9 @@ #include "squid.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" +#include "comm/Connection.h" #include "comm/comm_internal.h" #include "comm/ListenStateData.h" -#include "ConnectionDetail.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" @@ -151,8 +151,8 @@ Comm::ListenStateData::acceptOne() */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + Connection *connDetails = new Connection(); + int newfd = oldAccept(*connDetails); /* Check for errors */ if (newfd < 0) { @@ -171,7 +171,7 @@ Comm::ListenStateData::acceptOne() } debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << + " newfd: " << newfd << " from: " << connDetails->remote << " handler: " << *theCallback); notify(newfd, COMM_OK, 0, connDetails); return true; @@ -186,7 +186,7 @@ Comm::ListenStateData::acceptNext() } void -Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails) +Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, Comm::Connection *connDetails) { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@ -213,17 +213,17 @@ Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, const C * Wait for an incoming connection on FD. */ int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +Comm::ListenStateData::oldAccept(Comm::Connection &details) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; int sock; struct addrinfo *gai = NULL; - details.me.InitAddrInfo(gai); + details.local.InitAddrInfo(gai); if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { - details.me.FreeAddrInfo(gai); + details.local.FreeAddrInfo(gai); PROF_stop(comm_accept); @@ -239,21 +239,21 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details) } } - details.peer = *gai; + details.remote = *gai; if ( Config.client_ip_max_connections >= 0) { - if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) { - debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections."); - details.me.FreeAddrInfo(gai); + if (clientdbEstablished(details.remote, 0) > Config.client_ip_max_connections) { + debugs(50, DBG_IMPORTANT, "WARNING: " << details.remote << " attempting more than " << Config.client_ip_max_connections << " connections."); + details.local.FreeAddrInfo(gai); return COMM_ERROR; } } - details.me.InitAddrInfo(gai); + details.local.InitAddrInfo(gai); - details.me.SetEmpty(); + details.local.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); - details.me = *gai; + details.local = *gai; commSetCloseOnExec(sock); @@ -264,15 +264,15 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details) fdd_table[sock].close_line = 0; fde *F = &fd_table[sock]; - details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); - F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); + details.remote.NtoA(F->ipaddr,MAX_IPSTRLEN); + F->remote_port = details.remote.GetPort(); + F->local_addr.SetPort(details.local.GetPort()); #if USE_IPV6 F->sock_family = AF_INET; #else - F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6; + F->sock_family = details.local.IsIPv4()?AF_INET:AF_INET6; #endif - details.me.FreeAddrInfo(gai); + details.local.FreeAddrInfo(gai); commSetNonBlocking(sock); diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h index c7bdc7be2dd..d436adfc7d1 100644 --- a/src/comm/ListenStateData.h +++ b/src/comm/ListenStateData.h @@ -8,11 +8,11 @@ #include #endif -class ConnectionDetail; - namespace Comm { +class Connection; + class ListenStateData { @@ -23,7 +23,7 @@ class ListenStateData void subscribe(AsyncCall::Pointer &call); void acceptNext(); - void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &); + void notify(int newfd, comm_err_t, int xerrno, Comm::Connection *); int fd; @@ -42,7 +42,7 @@ class ListenStateData static void doAccept(int fd, void *data); bool acceptOne(); - int oldAccept(ConnectionDetail &details); + int oldAccept(Comm::Connection &details); AsyncCall::Pointer theCallback; bool mayAcceptMore; diff --git a/src/comm/Makefile.am b/src/comm/Makefile.am index 09cb1c10764..9487b38503c 100644 --- a/src/comm/Makefile.am +++ b/src/comm/Makefile.am @@ -1,13 +1,21 @@ include $(top_srcdir)/src/Common.am include $(top_srcdir)/src/TestHeaders.am -noinst_LTLIBRARIES = libcomm-listener.la +noinst_LTLIBRARIES = libcomm.la -## Library holding listener comm socket handlers -libcomm_listener_la_SOURCES= \ +## First group are listener comm socket handlers +## Second group are outbound connection setup handlers +## Third group are misc shared comm objects +libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ ListenStateData.cc \ ListenStateData.h \ \ + ConnectStateData.cc \ + ConnectStateData.h \ + \ + Connection.cc \ + Connection.h \ + comm_err_t.h \ comm_internal.h diff --git a/src/comm/comm_err_t.h b/src/comm/comm_err_t.h new file mode 100644 index 00000000000..1cad2a36b97 --- /dev/null +++ b/src/comm/comm_err_t.h @@ -0,0 +1,21 @@ +#ifndef _SQUID_COMM_COMM_ERR_T_H +#define _SQUID_COMM_COMM_ERR_T_H + +#include "config.h" + +typedef enum { + COMM_OK = 0, + COMM_ERROR = -1, + COMM_NOMESSAGE = -3, + COMM_TIMEOUT = -4, + COMM_SHUTDOWN = -5, + COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */ + COMM_INPROGRESS = -7, + COMM_ERR_CONNECT = -8, + COMM_ERR_DNS = -9, + COMM_ERR_CLOSING = -10, + COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */ + COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */ +} comm_err_t; + +#endif /* _SQUID_COMM_COMM_ERR_T_H */ diff --git a/src/defines.h b/src/defines.h index a6ac3eeaecd..b6725511eff 100644 --- a/src/defines.h +++ b/src/defines.h @@ -62,12 +62,6 @@ #define COMM_SELECT_READ (0x1) #define COMM_SELECT_WRITE (0x2) -#define COMM_NONBLOCKING 0x01 -#define COMM_NOCLOEXEC 0x02 -#define COMM_REUSEADDR 0x04 -#define COMM_TRANSPARENT 0x08 -#define COMM_DOBIND 0x10 - #define safe_free(x) if (x) { xxfree(x); x = NULL; } #define DISK_OK (0) diff --git a/src/dns_internal.cc b/src/dns_internal.cc index f7b2bdf8214..73eaf56d13a 100644 --- a/src/dns_internal.cc +++ b/src/dns_internal.cc @@ -33,16 +33,15 @@ * */ -#include "config.h" #include "squid.h" -#include "event.h" #include "CacheManager.h" -#include "SquidTime.h" -#include "Store.h" +#include "comm/ConnectStateData.h" #include "comm.h" +#include "event.h" #include "fde.h" #include "MemBuf.h" - +#include "SquidTime.h" +#include "Store.h" #include "wordlist.h" #if HAVE_ARPA_NAMESER_H @@ -176,6 +175,7 @@ static void idnsParseWIN32SearchList(const char *); #endif static void idnsCacheQuery(idns_query * q); static void idnsSendQuery(idns_query * q); +static CNCB idnsInitVCConnected; static IOCB idnsReadVCHeader; static void idnsDoSendQueryVC(nsvc *vc); @@ -186,6 +186,7 @@ static PF idnsRead; static EVH idnsCheckQueue; static void idnsTickleQueue(void); static void idnsRcodeCount(int, int); +static void idnsVCClosed(int fd, void *data); static void idnsAddNameserver(const char *buf) @@ -698,18 +699,21 @@ idnsDoSendQueryVC(nsvc *vc) } static void -idnsInitVCConnected(int fd, const DnsLookupDetails &, comm_err_t status, int xerrno, void *data) +idnsInitVCConnected(Comm::Connection *conn, Vector *unused, comm_err_t status, int xerrno, void *data) { nsvc * vc = (nsvc *)data; - if (status != COMM_OK) { + if (status != COMM_OK || !conn) { char buf[MAX_IPSTRLEN]; - debugs(78, 1, "idnsInitVCConnected: Failed to connect to nameserver " << nameservers[vc->ns].S.NtoA(buf,MAX_IPSTRLEN) << " using TCP!"); - comm_close(fd); + debugs(78, DBG_IMPORTANT, "Failed to connect to nameserver " << nameservers[vc->ns].S.NtoA(buf,MAX_IPSTRLEN) << " using TCP!"); + delete conn; return; } - comm_read(fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc); + vc->fd = conn->fd; // TODO: make the vc store the conn instead? + + comm_add_close_handler(conn->fd, idnsVCClosed, vc); + comm_read(conn->fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc); vc->busy = 0; idnsDoSendQueryVC(vc); } @@ -741,23 +745,18 @@ idnsInitVC(int ns) addr = Config.Addrs.udp_incoming; vc->queue = new MemBuf; - vc->msg = new MemBuf; + vc->busy = 1; - vc->fd = comm_open(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING, - "DNS TCP Socket"); - - if (vc->fd < 0) - fatal("Could not create a DNS socket"); - - comm_add_close_handler(vc->fd, idnsVCClosed, vc); + Comm::Connection *conn = new Comm::Connection; + conn->local = addr; + conn->remote = nameservers[ns].S; - vc->busy = 1; + AsyncCall::Pointer call = commCbCall(78,3, "idnsInitVCConnected", CommConnectCbPtrFun(idnsInitVCConnected, vc)); - commConnectStart(vc->fd, nameservers[ns].S.NtoA(buf,MAX_IPSTRLEN), nameservers[ns].S.GetPort(), idnsInitVCConnected, vc); + ConnectStateData *cs = new ConnectStateData(conn, call); + cs->host = xstrdup("DNS TCP Socket"); + cs->connect(); } static void diff --git a/src/forward.cc b/src/forward.cc index 7da32962784..4a45a39f7b5 100644 --- a/src/forward.cc +++ b/src/forward.cc @@ -32,34 +32,42 @@ #include "squid.h" -#include "forward.h" #include "acl/FilledChecklist.h" #include "acl/Gadgets.h" #include "CacheManager.h" +#include "comm/ConnectStateData.h" +#include "CommCalls.h" #include "event.h" #include "errorpage.h" #include "fde.h" +#include "forward.h" #include "hier_code.h" #include "HttpReply.h" #include "HttpRequest.h" #include "MemObject.h" #include "pconn.h" +#include "PeerSelectState.h" #include "SquidTime.h" #include "Store.h" #include "icmp/net_db.h" #include "ip/Intercept.h" + static PSC fwdStartCompleteWrapper; static PF fwdServerClosedWrapper; #if USE_SSL static PF fwdNegotiateSSLWrapper; #endif +#if 0 static PF fwdConnectTimeoutWrapper; static EVH fwdConnectStartWrapper; +#endif static CNCB fwdConnectDoneWrapper; static OBJH fwdStats; +#if 0 static void fwdServerFree(FwdServer * fs); +#endif #define MAX_FWD_STATS_IDX 9 static int FwdReplyCodes[MAX_FWD_STATS_IDX + 1][HTTP_INVALID_HEADER + 1]; @@ -78,9 +86,8 @@ FwdState::abort(void* d) FwdState* fwd = (FwdState*)d; Pointer tmp = fwd; // Grab a temporary pointer to keep the object alive during our scope. - if (fwd->server_fd >= 0) { - comm_close(fwd->server_fd); - fwd->server_fd = -1; + if (fwd->paths[0]->fd >= 0) { + comm_close(fwd->paths[0]); } fwd->self = NULL; @@ -92,7 +99,6 @@ FwdState::FwdState(int fd, StoreEntry * e, HttpRequest * r) { entry = e; client_fd = fd; - server_fd = -1; request = HTTPMSGLOCK(r); start_t = squid_curtime; @@ -113,10 +119,7 @@ void FwdState::start(Pointer aSelf) // Otherwise we are going to leak our object. entry->registerAbort(FwdState::abort, this); - peerSelect(request, entry, fwdStartCompleteWrapper, this); - - // TODO: set self _after_ the peer is selected because we do not need - // self until we start talking to some Server. + peerSelect(&paths, request, entry, fwdStartCompleteWrapper, this); } void @@ -162,8 +165,6 @@ FwdState::~FwdState() if (! flags.forward_completed) completed(); - serversFree(&servers); - HTTPMSGUNLOCK(request); if (err) @@ -175,15 +176,14 @@ FwdState::~FwdState() entry = NULL; - int fd = server_fd; - - if (fd > -1) { - server_fd = -1; - comm_remove_close_handler(fd, fwdServerClosedWrapper, this); - debugs(17, 3, "fwdStateFree: closing FD " << fd); - comm_close(fd); + if (paths[0]->fd > -1) { + comm_remove_close_handler(paths[0]->fd, fwdServerClosedWrapper, this); + debugs(17, 3, HERE << "closing FD " << paths[0]->fd); + comm_close(paths[0]); } + paths.clean(); + debugs(17, 3, HERE << "FwdState destructor done"); } @@ -226,7 +226,7 @@ FwdState::fwdStart(int client_fd, StoreEntry *entry, HttpRequest *request) } } - debugs(17, 3, "FwdState::start() '" << entry->url() << "'"); + debugs(17, 3, HERE << "'" << entry->url() << "'"); /* * This seems like an odd place to bind mem_obj and request. * Might want to assert that request is NULL at this point @@ -260,13 +260,6 @@ FwdState::fwdStart(int client_fd, StoreEntry *entry, HttpRequest *request) default: FwdState::Pointer fwd = new FwdState(client_fd, entry, request); - - /* If we need to transparently proxy the request - * then we need the client source protocol, address and port */ - if (request->flags.spoof_client_ip) { - fwd->src = request->client_addr; - } - fwd->start(fwd); return; } @@ -274,6 +267,22 @@ FwdState::fwdStart(int client_fd, StoreEntry *entry, HttpRequest *request) /* NOTREACHED */ } +void +FwdState::startComplete() +{ + debugs(17, 3, HERE << entry->url() ); + + if (paths.size() > 0) { + connectStart(); + } else { + debugs(17, 3, HERE << entry->url() ); + ErrorState *anErr = errorCon(ERR_CANNOT_FORWARD, HTTP_SERVICE_UNAVAILABLE, request); + anErr->xerrno = errno; + fail(anErr); + self = NULL; // refcounted + } +} + void FwdState::fail(ErrorState * errorState) { @@ -295,10 +304,9 @@ void FwdState::unregister(int fd) { debugs(17, 3, HERE << entry->url() ); - assert(fd == server_fd); + assert(fd == paths[0]->fd); assert(fd > -1); comm_remove_close_handler(fd, fwdServerClosedWrapper, this); - server_fd = -1; } /** @@ -310,9 +318,8 @@ FwdState::unregister(int fd) void FwdState::complete() { - StoreEntry *e = entry; assert(entry->store_status == STORE_PENDING); - debugs(17, 3, HERE << e->url() << "\n\tstatus " << entry->getReply()->sline.status ); + debugs(17, 3, HERE << entry->url() << "\n\tstatus " << entry->getReply()->sline.status ); #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); @@ -321,20 +328,28 @@ FwdState::complete() logReplyStatus(n_tries, entry->getReply()->sline.status); if (reforward()) { - debugs(17, 3, "fwdComplete: re-forwarding " << entry->getReply()->sline.status << " " << e->url()); + debugs(17, 3, HERE << "re-forwarding " << entry->getReply()->sline.status << " " << entry->url()); - if (server_fd > -1) - unregister(server_fd); + if (paths[0]->fd > -1) + unregister(paths[0]->fd); - e->reset(); + entry->reset(); + + /* the call to reforward() has already dropped the last path off the + * selection list. all we have now are the next path(s) to be tried. + */ - startComplete(servers); + AsyncCall::Pointer call = commCbCall(17,3, "fwdConnectDoneWrapper", CommConnectCbPtrFun(fwdConnectDoneWrapper, this)); + ConnectStateData *cs = new ConnectStateData(&paths, call); + cs->host = xstrdup(entry->url()); + cs->connect_timeout = Config.Timeout.connect; + cs->connect(); } else { - debugs(17, 3, "fwdComplete: server FD " << server_fd << " not re-forwarding status " << entry->getReply()->sline.status); + debugs(17, 3, HERE << "server FD " << paths[0]->fd << " not re-forwarding status " << entry->getReply()->sline.status); EBIT_CLR(entry->flags, ENTRY_FWD_HDR_WAIT); entry->complete(); - if (server_fd < 0) + if (paths[0]->fd < 0) completed(); self = NULL; // refcounted @@ -345,10 +360,10 @@ FwdState::complete() /**** CALLBACK WRAPPERS ************************************************************/ static void -fwdStartCompleteWrapper(FwdServer * servers, void *data) +fwdStartCompleteWrapper(Vector *unused, void *data) { FwdState *fwd = (FwdState *) data; - fwd->startComplete(servers); + fwd->startComplete(); } static void @@ -372,31 +387,13 @@ fwdNegotiateSSLWrapper(int fd, void *data) FwdState *fwd = (FwdState *) data; fwd->negotiateSSL(fd); } - #endif -static void -fwdConnectDoneWrapper(int server_fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data) -{ - FwdState *fwd = (FwdState *) data; - fwd->connectDone(server_fd, dns, status, xerrno); -} - -static void -fwdConnectTimeoutWrapper(int fd, void *data) +void +fwdConnectDoneWrapper(Comm::Connection *conn, Vector *paths, comm_err_t status, int xerrno, void *data) { FwdState *fwd = (FwdState *) data; - fwd->connectTimeout(fd); -} - -/* - * Accounts for closed persistent connections - */ -static void -fwdPeerClosed(int fd, void *data) -{ - peer *p = (peer *)data; - p->stats.conn_open--; + fwd->connectDone(conn, paths, status, xerrno); } /**** PRIVATE *****************************************************************/ @@ -487,9 +484,12 @@ FwdState::checkRetriable() void FwdState::serverClosed(int fd) { - debugs(17, 2, "fwdServerClosed: FD " << fd << " " << entry->url()); - assert(server_fd == fd); - server_fd = -1; + debugs(17, 2, HERE << "FD " << fd << " " << entry->url()); + assert(paths[0]->fd == fd); + + if (paths[0]->_peer) { + paths[0]->_peer->stats.conn_open--; + } retryOrBail(); } @@ -504,36 +504,26 @@ FwdState::retryOrBail() } if (checkRetry()) { - int originserver = (servers->_peer == NULL); - debugs(17, 3, "fwdServerClosed: re-forwarding (" << n_tries << " tries, " << (squid_curtime - start_t) << " secs)"); - - if (servers->next) { - /* use next, or cycle if origin server isn't last */ - FwdServer *fs = servers; - FwdServer **T, *T2 = NULL; - servers = fs->next; - - for (T = &servers; *T; T2 = *T, T = &(*T)->next); - if (T2 && T2->_peer) { - /* cycle */ - *T = fs; - fs->next = NULL; - } else { - /* Use next. The last "direct" entry is retried multiple times */ - servers = fs->next; - fwdServerFree(fs); - originserver = 0; - } - } + debugs(17, 3, HERE << "re-forwarding (" << n_tries << " tries, " << (squid_curtime - start_t) << " secs)"); - /* Ditch error page if it was created before. - * A new one will be created if there's another problem */ - err = NULL; + paths.shift(); // last one failed. try another. - /* use eventAdd to break potential call sequence loops and to slow things down a little */ - eventAdd("fwdConnectStart", fwdConnectStartWrapper, this, originserver ? 0.05 : 0.005, 0); + if (paths.size() > 0) { + /* Ditch error page if it was created before. + * A new one will be created if there's another problem */ + err = NULL; - return; + AsyncCall::Pointer call = commCbCall(17,3,"fwdConnectDoneWrapper", CommConnectCbPtrFun(fwdConnectDoneWrapper, this)); + ConnectStateData *cs = new ConnectStateData(&paths, call); + cs->host = xstrdup(entry->url()); + cs->connect_timeout = Config.Timeout.connect; + cs->connect(); + + /* use eventAdd to break potential call sequence loops and to slow things down a little */ + eventAdd("fwdConnectStart", fwdConnectStartWrapper, this, (paths[0]->_peer == NULL) ? 0.05 : 0.005, 0); + return; + } + // else bail. no more paths possible to try. } if (!err && shutting_down) { @@ -547,9 +537,8 @@ FwdState::retryOrBail() void FwdState::handleUnregisteredServerEnd() { - debugs(17, 2, "handleUnregisteredServerEnd: self=" << self << - " err=" << err << ' ' << entry->url()); - assert(server_fd < 0); + debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url()); + assert(paths[0]->fd < 0); retryOrBail(); } @@ -557,7 +546,6 @@ FwdState::handleUnregisteredServerEnd() void FwdState::negotiateSSL(int fd) { - FwdServer *fs = servers; SSL *ssl = fd_table[fd].ssl; int ret; @@ -589,21 +577,21 @@ FwdState::negotiateSSL(int fd) fail(anErr); - if (fs->_peer) { - peerConnectFailed(fs->_peer); - fs->_peer->stats.conn_open--; + if (paths[0]->_peer) { + peerConnectFailed(paths[0]->_peer); + paths[0]->_peer->stats.conn_open--; } - comm_close(fd); + comm_close(paths[0]); return; } } - if (fs->_peer && !SSL_session_reused(ssl)) { - if (fs->_peer->sslSession) - SSL_SESSION_free(fs->_peer->sslSession); + if (paths[0]->_peer && !SSL_session_reused(ssl)) { + if (paths[0]->_peer->sslSession) + SSL_SESSION_free(paths[0]->_peer->sslSession); - fs->_peer->sslSession = SSL_get1_session(ssl); + paths[0]->_peer->sslSession = SSL_get1_session(ssl); } dispatch(); @@ -612,11 +600,10 @@ FwdState::negotiateSSL(int fd) void FwdState::initiateSSL() { - FwdServer *fs = servers; - int fd = server_fd; SSL *ssl; SSL_CTX *sslContext = NULL; - peer *peer = fs->_peer; + peer *peer = paths[0]->_peer; + int fd = paths[0]->fd; if (peer) { assert(peer->use_ssl); @@ -676,16 +663,15 @@ FwdState::initiateSSL() #endif void -FwdState::connectDone(int aServerFD, const DnsLookupDetails &dns, comm_err_t status, int xerrno) +FwdState::connectDone(Comm::Connection *conn, Vector *result_paths, comm_err_t status, int xerrno) { - FwdServer *fs = servers; - assert(server_fd == aServerFD); - - request->recordLookup(dns); + assert(result_paths == &paths); - if (Config.onoff.log_ip_on_direct && status != COMM_ERR_DNS && fs->code == HIER_DIRECT) + if (Config.onoff.log_ip_on_direct && /* status != COMM_ERR_DNS &&*/ (paths[0])->peer_type == HIER_DIRECT) updateHierarchyInfo(); +#if 0 // we no longer are limited to handling this here. + // the selectForwardingPaths shoudl handle things like this now. if (status == COMM_ERR_DNS) { /* * Only set the dont_retry flag if the DNS lookup fails on @@ -696,7 +682,7 @@ FwdState::connectDone(int aServerFD, const DnsLookupDetails &dns, comm_err_t sta if (NULL == fs->_peer) flags.dont_retry = 1; - debugs(17, 4, "fwdConnectDone: Unknown host: " << request->GetHost()); + debugs(17, 4, "Unknown host: " << request->GetHost()); ErrorState *anErr = errorCon(ERR_DNS_FAIL, HTTP_SERVICE_UNAVAILABLE, request); @@ -705,31 +691,33 @@ FwdState::connectDone(int aServerFD, const DnsLookupDetails &dns, comm_err_t sta fail(anErr); comm_close(server_fd); - } else if (status != COMM_OK) { - assert(fs); + } else +#endif + if (status != COMM_OK) { ErrorState *anErr = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); anErr->xerrno = xerrno; fail(anErr); - if (fs->_peer) - peerConnectFailed(fs->_peer); + if (paths[0]->_peer) + peerConnectFailed(paths[0]->_peer); - comm_close(server_fd); + comm_close(paths[0]); } else { - debugs(17, 3, "fwdConnectDone: FD " << server_fd << ": '" << entry->url() << "'" ); + debugs(17, 3, "FD " << paths[0]->fd << ": '" << entry->url() << "'" ); + + comm_add_close_handler(conn->fd, fwdServerClosedWrapper, this); - if (fs->_peer) - peerConnectSucceded(fs->_peer); + if (paths[0]->_peer) + peerConnectSucceded(paths[0]->_peer); #if USE_SSL - if ((fs->_peer && fs->_peer->use_ssl) || - (!fs->_peer && request->protocol == PROTO_HTTPS)) { + if ((paths[0]->_peer && paths[0]->_peer->use_ssl) || + (!paths[0]->_peer && request->protocol == PROTO_HTTPS)) { initiateSSL(); return; } - #endif dispatch(); } @@ -738,124 +726,113 @@ FwdState::connectDone(int aServerFD, const DnsLookupDetails &dns, comm_err_t sta void FwdState::connectTimeout(int fd) { - FwdServer *fs = servers; - debugs(17, 2, "fwdConnectTimeout: FD " << fd << ": '" << entry->url() << "'" ); - assert(fd == server_fd); + assert(fd == paths[0]->fd); - if (Config.onoff.log_ip_on_direct && fs->code == HIER_DIRECT && fd_table[fd].ipaddr[0]) + if (Config.onoff.log_ip_on_direct && paths[0]->peer_type == HIER_DIRECT) updateHierarchyInfo(); if (entry->isEmpty()) { ErrorState *anErr = errorCon(ERR_CONNECT_FAIL, HTTP_GATEWAY_TIMEOUT, request); anErr->xerrno = ETIMEDOUT; fail(anErr); - /* - * This marks the peer DOWN ... - */ - if (servers) - if (servers->_peer) - peerConnectFailed(servers->_peer); + /* This marks the peer DOWN ... */ + if (paths.size() > 0) + if (paths[0]->_peer) + peerConnectFailed(paths[0]->_peer); } - comm_close(fd); + comm_close(paths[0]); } +/** + * Called after Forwarding path selection (via peer select) has taken place + * We have a vector of possible paths now ready to start being connected. + */ void FwdState::connectStart() { - const char *url = entry->url(); - int fd = -1; - FwdServer *fs = servers; - const char *host; - unsigned short port; - int ctimeout; - int ftimeout = Config.Timeout.forward - (squid_curtime - start_t); - - Ip::Address outgoing; - unsigned short tos; - Ip::Address client_addr; - assert(fs); - assert(server_fd == -1); - debugs(17, 3, "fwdConnectStart: " << url); + debugs(17, 3, "fwdConnectStart: " << entry->url()); if (n_tries == 0) // first attempt request->hier.first_conn_start = current_time; - if (fs->_peer) { - ctimeout = fs->_peer->connect_timeout > 0 ? fs->_peer->connect_timeout - : Config.Timeout.peer_connect; + Comm::Connection *conn = paths[0]; + + /* connection timeout */ + int ctimeout; + if (conn->_peer) { + ctimeout = conn->_peer->connect_timeout > 0 ? conn->_peer->connect_timeout : Config.Timeout.peer_connect; } else { ctimeout = Config.Timeout.connect; } - if (request->flags.spoof_client_ip) { - if (!fs->_peer || !fs->_peer->options.no_tproxy) - client_addr = request->client_addr; - // else no tproxy today ... - } - + /* calculate total forwarding timeout ??? */ + int ftimeout = Config.Timeout.forward - (squid_curtime - start_t); if (ftimeout < 0) ftimeout = 5; if (ftimeout < ctimeout) ctimeout = ftimeout; - request->flags.pinned = 0; - if (fs->code == PINNED) { + if (conn->peer_type == PINNED) { ConnStateData *pinned_connection = request->pinnedConnection(); assert(pinned_connection); - fd = pinned_connection->validatePinnedConnection(request, fs->_peer); - if (fd >= 0) { + conn->fd = pinned_connection->validatePinnedConnection(request, conn->_peer); + if (conn->fd >= 0) { pinned_connection->unpinConnection(); #if 0 - if (!fs->_peer) - fs->code = HIER_DIRECT; + if (!conn->_peer) + conn->peer_type = HIER_DIRECT; #endif - server_fd = fd; n_tries++; request->flags.pinned = 1; if (pinned_connection->pinnedAuth()) request->flags.auth = 1; - comm_add_close_handler(fd, fwdServerClosedWrapper, this); updateHierarchyInfo(); - connectDone(fd, DnsLookupDetails(), COMM_OK, 0); + FwdState::connectDone(conn, &paths, COMM_OK, 0); return; } /* Failure. Fall back on next path */ debugs(17,2,HERE << " Pinned connection " << pinned_connection << " not valid. Releasing."); request->releasePinnedConnection(); - servers = fs->next; - fwdServerFree(fs); + paths.shift(); + delete conn; connectStart(); return; } - if (fs->_peer) { - host = fs->_peer->host; - port = fs->_peer->http_port; - fd = fwdPconnPool->pop(fs->_peer->name, fs->_peer->http_port, request->GetHost(), client_addr, checkRetriable()); +// TODO: now that we are dealing with actual IP->IP links. should we still anchor pconn on hostname? +// or on the remote IP+port? +// that could reduce the pconns per virtual server a fair amount +// and prevent crossover between servers hosting the one domain + + const char *host; + int port; + if (conn->_peer) { + host = conn->_peer->host; + port = conn->_peer->http_port; + conn->fd = fwdPconnPool->pop(conn->_peer->name, conn->_peer->http_port, request->GetHost(), conn->local, checkRetriable()); } else { host = request->GetHost(); port = request->port; - fd = fwdPconnPool->pop(host, port, NULL, client_addr, checkRetriable()); + conn->fd = fwdPconnPool->pop(host, port, NULL, conn->local, checkRetriable()); } - if (fd >= 0) { - debugs(17, 3, "fwdConnectStart: reusing pconn FD " << fd); - server_fd = fd; + conn->remote.SetPort(port); + + if (conn->fd >= 0) { + debugs(17, 3, HERE << "reusing pconn FD " << conn->fd); n_tries++; - if (!fs->_peer) + if (!conn->_peer) origin_tries++; updateHierarchyInfo(); - comm_add_close_handler(fd, fwdServerClosedWrapper, this); - + comm_add_close_handler(conn->fd, fwdServerClosedWrapper, this); dispatch(); - return; } @@ -863,96 +840,41 @@ FwdState::connectStart() entry->mem_obj->checkUrlChecksum(); #endif - outgoing = getOutgoingAddr(request, fs->_peer); - - tos = getOutgoingTOS(request); - - debugs(17, 3, "fwdConnectStart: got outgoing addr " << outgoing << ", tos " << tos); - - int commFlags = COMM_NONBLOCKING; - if (request->flags.spoof_client_ip) { - if (!fs->_peer || !fs->_peer->options.no_tproxy) - commFlags |= COMM_TRANSPARENT; - // else no tproxy today ... - } - - fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, outgoing, commFlags, tos, url); - - debugs(17, 3, "fwdConnectStart: got TCP FD " << fd); - - if (fd < 0) { - debugs(50, 4, "fwdConnectStart: " << xstrerror()); - ErrorState *anErr = errorCon(ERR_SOCKET_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request); - anErr->xerrno = errno; - fail(anErr); - self = NULL; // refcounted - return; - } - - server_fd = fd; - n_tries++; - - if (!fs->_peer) - origin_tries++; - - /* - * stats.conn_open is used to account for the number of - * connections that we have open to the peer, so we can limit - * based on the max-conn option. We need to increment here, - * even if the connection may fail. - */ - - if (fs->_peer) { - fs->_peer->stats.conn_open++; - comm_add_close_handler(fd, fwdPeerClosed, fs->_peer); - } - - comm_add_close_handler(fd, fwdServerClosedWrapper, this); - - commSetTimeout(fd, ctimeout, fwdConnectTimeoutWrapper, this); - updateHierarchyInfo(); - commConnectStart(fd, host, port, fwdConnectDoneWrapper, this); -} - -void -FwdState::startComplete(FwdServer * theServers) -{ - debugs(17, 3, "fwdStartComplete: " << entry->url() ); - if (theServers != NULL) { - servers = theServers; - connectStart(); - } else { - startFail(); - } + AsyncCall::Pointer call = commCbCall(17,3, "fwdConnectDoneWrapper", CommConnectCbPtrFun(fwdConnectDoneWrapper, this)); + ConnectStateData *cs = new ConnectStateData(&paths, call); + cs->host = xstrdup(host); + cs->connect_timeout = ctimeout; + cs->connect(); } +#if DEAD void FwdState::startFail() { - debugs(17, 3, "fwdStartFail: " << entry->url() ); + debugs(17, 3, HERE << entry->url() ); ErrorState *anErr = errorCon(ERR_CANNOT_FORWARD, HTTP_SERVICE_UNAVAILABLE, request); anErr->xerrno = errno; fail(anErr); - self = NULL; // refcounted + self = NULL; // refcounted } +#endif void FwdState::dispatch() { - peer *p = NULL; debugs(17, 3, "fwdDispatch: FD " << client_fd << ": Fetching '" << RequestMethodStr(request->method) << " " << entry->url() << "'" ); /* * Assert that server_fd is set. This is to guarantee that fwdState * is attached to something and will be deallocated when server_fd * is closed. */ - assert(server_fd > -1); + assert(paths.size() > 0 && paths[0]->fd > -1); - fd_note(server_fd, entry->url()); + fd_note(paths[0]->fd, entry->url()); - fd_table[server_fd].noteUse(fwdPconnPool); + fd_table[paths[0]->fd].noteUse(fwdPconnPool); /*assert(!EBIT_TEST(entry->flags, ENTRY_DISPATCHED)); */ assert(entry->ping_status != PING_WAITING); @@ -975,10 +897,10 @@ FwdState::dispatch() int tos = 1; int tos_len = sizeof(tos); clientFde->upstreamTOS = 0; - if (setsockopt(server_fd,SOL_IP,IP_RECVTOS,&tos,tos_len)==0) { + if (setsockopt(paths[0]->fd,SOL_IP,IP_RECVTOS,&tos,tos_len)==0) { unsigned char buf[512]; int len = 512; - if (getsockopt(server_fd,SOL_IP,IP_PKTOPTIONS,buf,(socklen_t*)&len) == 0) { + if (getsockopt(paths[0]->fd,SOL_IP,IP_PKTOPTIONS,buf,(socklen_t*)&len) == 0) { /* Parse the PKTOPTIONS structure to locate the TOS data message * prepared in the kernel by the ZPH incoming TCP TOS preserving * patch. @@ -997,18 +919,18 @@ FwdState::dispatch() pbuf += CMSG_LEN(o->cmsg_len); } } else { - debugs(33, 1, "ZPH: error in getsockopt(IP_PKTOPTIONS) on FD "<fd << " " << xstrerror()); } } else { - debugs(33, 1, "ZPH: error in setsockopt(IP_RECVTOS) on FD "<fd << " " << xstrerror()); } } #endif - if (servers && (p = servers->_peer)) { - p->stats.fetches++; - request->peer_login = p->login; - request->peer_domain = p->domain; + if (paths.size() > 0 && paths[0]->_peer != NULL) { + paths[0]->_peer->stats.fetches++; + request->peer_login = paths[0]->_peer->login; + request->peer_domain = paths[0]->_peer->domain; httpStart(this); } else { request->peer_login = NULL; @@ -1063,7 +985,7 @@ FwdState::dispatch() * transient (network) error; its a bug. */ flags.dont_retry = 1; - comm_close(server_fd); + comm_close(paths[0]); break; } } @@ -1081,7 +1003,6 @@ int FwdState::reforward() { StoreEntry *e = entry; - FwdServer *fs = servers; http_status s; assert(e->store_status == STORE_PENDING); assert(e->mem_obj); @@ -1090,10 +1011,10 @@ FwdState::reforward() e->mem_obj->checkUrlChecksum(); #endif - debugs(17, 3, "fwdReforward: " << e->url() << "?" ); + debugs(17, 3, HERE << e->url() << "?" ); if (!EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) { - debugs(17, 3, "fwdReforward: No, ENTRY_FWD_HDR_WAIT isn't set"); + debugs(17, 3, HERE << "No, ENTRY_FWD_HDR_WAIT isn't set"); return 0; } @@ -1106,19 +1027,15 @@ FwdState::reforward() if (request->bodyNibbled()) return 0; - assert(fs); - - servers = fs->next; - - fwdServerFree(fs); + paths.shift(); - if (servers == NULL) { - debugs(17, 3, "fwdReforward: No forward-servers left"); + if (paths.size() > 0) { + debugs(17, 3, HERE << "No alternative forwarding paths left"); return 0; } s = e->getReply()->sline.status; - debugs(17, 3, "fwdReforward: status " << s); + debugs(17, 3, HERE << "status " << s); return reforwardableStatus(s); } @@ -1201,8 +1118,6 @@ FwdState::pconnPush(int fd, const peer *_peer, const HttpRequest *req, const cha void FwdState::initModule() { - memDataInit(MEM_FWD_SERVER, "FwdServer", sizeof(FwdServer), 0); - #if WIP_FWD_LOG if (logfile) @@ -1240,17 +1155,6 @@ FwdState::logReplyStatus(int tries, http_status status) FwdReplyCodes[tries][status]++; } -void -FwdState::serversFree(FwdServer ** FSVR) -{ - FwdServer *fs; - - while ((fs = *FSVR)) { - *FSVR = fs->next; - fwdServerFree(fs); - } -} - /** From Comment #5 by Henrik Nordstrom made at http://www.squid-cache.org/bugs/show_bug.cgi?id=2391 on 2008-09-19 @@ -1269,38 +1173,29 @@ FwdState::updateHierarchyInfo() { assert(request); - FwdServer *fs = servers; - assert(fs); + assert(paths.size() > 0); - const char *nextHop = NULL; + char nextHop[256]; // - if (fs->_peer) { + if (paths[0]->_peer) { // went to peer, log peer host name - nextHop = fs->_peer->name; + snprintf(nextHop,256,"%s", paths[0]->_peer->name); } else { // went DIRECT, must honor log_ip_on_direct - - // XXX: or should we use request->host_addr here? how? - assert(server_fd >= 0); - nextHop = fd_table[server_fd].ipaddr; - if (!Config.onoff.log_ip_on_direct || !nextHop[0]) - nextHop = request->GetHost(); // domain name + if (!Config.onoff.log_ip_on_direct) + snprintf(nextHop,256,"%s",request->GetHost()); // domain name + else + paths[0]->remote.NtoA(nextHop, 256); } - assert(nextHop); - hierarchyNote(&request->hier, fs->code, nextHop); + assert(nextHop[0]); + hierarchyNote(&request->hier, paths[0]->peer_type, nextHop); } /**** PRIVATE NON-MEMBER FUNCTIONS ********************************************/ -static void -fwdServerFree(FwdServer * fs) -{ - cbdataReferenceDone(fs->_peer); - memFree(fs, MEM_FWD_SERVER); -} - +#if DEAD static Ip::Address aclMapAddr(acl_address * head, ACLChecklist * ch) { @@ -1316,6 +1211,7 @@ aclMapAddr(acl_address * head, ACLChecklist * ch) addr.SetAnyAddr(); return addr; } +#endif /* * DPW 2007-05-19 @@ -1334,21 +1230,34 @@ aclMapTOS(acl_tos * head, ACLChecklist * ch) return 0; } -Ip::Address -getOutgoingAddr(HttpRequest * request, struct peer *dst_peer) +void +getOutgoingAddress(HttpRequest * request, Comm::Connection *conn) { + /* skip if an outgoing address is already set. */ + if (!conn->local.IsAnyAddr()) return; + + // maybe use TPROXY client address if (request && request->flags.spoof_client_ip) { - if (!dst_peer || !dst_peer->options.no_tproxy) - return request->client_addr; + if (!conn->_peer || !conn->_peer->options.no_tproxy) { + conn->local = request->client_addr; + // some flags need setting on the socket to use this address + conn->flags |= COMM_DOBIND; + conn->flags |= COMM_TRANSPARENT; + return; + } // else no tproxy today ... } if (!Config.accessList.outgoing_address) { - return Ip::Address(); // anything will do. + return; // anything will do. } ACLFilledChecklist ch(NULL, request, NULL); - ch.dst_peer = dst_peer; + ch.dst_peer = conn->_peer; + ch.dst_addr = conn->remote; + + // TODO use the connection details in ACL. + // needs a bit of rework in ACLFilledChecklist to use Comm::Connection instead of ConnStateData if (request) { #if FOLLOW_X_FORWARDED_FOR @@ -1360,7 +1269,18 @@ getOutgoingAddr(HttpRequest * request, struct peer *dst_peer) ch.my_addr = request->my_addr; } - return aclMapAddr(Config.accessList.outgoing_address, &ch); + acl_address *l; + for (l = Config.accessList.outgoing_address; l; l = l->next) { + + /* check if the outgoing address is usable to the destination */ + if (conn->remote.IsIPv4() != l->addr.IsIPv4()) continue; + + /* check ACLs for this outgoing address */ + if (!l->aclList || ch.matchAclListFast(l->aclList)) { + conn->local = l->addr; + return; + } + } } unsigned long diff --git a/src/forward.h b/src/forward.h index 0ac5ae90c6a..804b239e900 100644 --- a/src/forward.h +++ b/src/forward.h @@ -7,9 +7,12 @@ class ErrorState; class HttpRequest; #include "comm.h" -#include "hier_code.h" +#include "comm/Connection.h" +//#include "hier_code.h" #include "ip/Address.h" +#include "Array.h" +#if 0 // replaced by vector of extended Comm::Connection objects (paths) class FwdServer { public: @@ -18,6 +21,10 @@ class FwdServer FwdServer *next; }; +typedef void PSC(FwdServer *, void *); + +#endif + class FwdState : public RefCountable { public: @@ -26,8 +33,8 @@ class FwdState : public RefCountable static void initModule(); static void fwdStart(int fd, StoreEntry *, HttpRequest *); - void startComplete(FwdServer *); - void startFail(); + void startComplete(); +// void startFail(); void fail(ErrorState *err); void unregister(int fd); void complete(); @@ -36,7 +43,7 @@ class FwdState : public RefCountable bool reforwardableStatus(http_status s); void serverClosed(int fd); void connectStart(); - void connectDone(int server_fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno); + void connectDone(Comm::Connection *conn, Vector *paths, comm_err_t status, int xerrno); void connectTimeout(int fd); void initiateSSL(); void negotiateSSL(int fd); @@ -53,7 +60,7 @@ class FwdState : public RefCountable void ftpPasvFailed(bool val) { flags.ftp_pasv_failed = val; } - static void serversFree(FwdServer **); + Comm::Connection *conn() const { return paths[0]; }; private: // hidden for safer management of self; use static fwdStart @@ -76,8 +83,6 @@ class FwdState : public RefCountable public: StoreEntry *entry; HttpRequest *request; - int server_fd; - FwdServer *servers; static void abort(void*); private: @@ -98,7 +103,8 @@ class FwdState : public RefCountable unsigned int forward_completed:1; } flags; - Ip::Address src; /* Client address for this connection. Needed for transparent operations. */ + /** possible paths which may be tried (in sequence stored) */ + Vector paths; // NP: keep this last. It plays with private/public CBDATA_CLASS2(FwdState); diff --git a/src/fqdncache.cc b/src/fqdncache.cc index 573c7925ee7..49998ba82d9 100644 --- a/src/fqdncache.cc +++ b/src/fqdncache.cc @@ -34,6 +34,7 @@ #include "squid.h" #include "cbdata.h" +#include "DnsLookupDetails.h" #include "event.h" #include "CacheManager.h" #include "SquidTime.h" diff --git a/src/ftp.cc b/src/ftp.cc index eff4ddc7f6b..1f3dea861d6 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -34,9 +34,9 @@ #include "squid.h" #include "comm.h" +#include "comm/ConnectStateData.h" #include "comm/ListenStateData.h" #include "compat/strtoll.h" -#include "ConnectionDetail.h" #include "errorpage.h" #include "fde.h" #include "forward.h" @@ -480,7 +480,7 @@ FtpStateData::FtpStateData(FwdState *theFwdState) : AsyncJob("FtpStateData"), Se typedef CommCbMemFunT Dialer; AsyncCall::Pointer closer = asyncCall(9, 5, "FtpStateData::ctrlClosed", Dialer(this, &FtpStateData::ctrlClosed)); - ctrl.opened(theFwdState->server_fd, closer); + ctrl.opened(theFwdState->conn()->fd, closer); if (request->method == METHOD_PUT) flags.put = 1; @@ -2406,7 +2406,15 @@ ftpReadEPSV(FtpStateData* ftpState) debugs(9, 3, HERE << "connecting to " << ftpState->data.host << ", port " << ftpState->data.port); - commConnectStart(fd, ftpState->data.host, port, FtpStateData::ftpPasvCallback, ftpState); + Comm::Connection *conn = new Comm::Connection; + conn->remote = fd_table[ftpState->ctrl.fd].ipaddr; // TODO: do we have a better info source than fd_table? + conn->remote.SetPort(port); + conn->fd = fd; + + AsyncCall::Pointer call = commCbCall(9,3, "FtpStateData::ftpPasvCallback", CommConnectCbPtrFun(FtpStateData::ftpPasvCallback, ftpState)); + ConnectStateData *cs = new ConnectStateData(conn, call); + cs->host = xstrdup(fd_table[ftpState->ctrl.fd].ipaddr); + cs->connect(); } /** \ingroup ServerProtocolFTPInternal @@ -2539,10 +2547,11 @@ ftpSendPassive(FtpStateData * ftpState) /** Otherwise, Open data channel with the same local address as control channel (on a new random port!) */ addr.SetPort(0); - int fd = comm_open(SOCK_STREAM, + int fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, addr, COMM_NONBLOCKING, + 0, ftpState->entry->url()); debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << " from " << addr); @@ -2682,15 +2691,24 @@ ftpReadPasv(FtpStateData * ftpState) debugs(9, 3, HERE << "connecting to " << ftpState->data.host << ", port " << ftpState->data.port); - commConnectStart(fd, ipaddr, port, FtpStateData::ftpPasvCallback, ftpState); + Comm::Connection *conn = new Comm::Connection; + conn->remote = ipaddr; + conn->remote.SetPort(port); + conn->fd = ftpState->data.fd; + + AsyncCall::Pointer call = commCbCall(9,3, "FtpStateData::ftpPasvCallback", CommConnectCbPtrFun(FtpStateData::ftpPasvCallback, ftpState)); + ConnectStateData *cs = new ConnectStateData(conn, call); + cs->host = xstrdup(ftpState->data.host); + cs->connect_timeout = Config.Timeout.connect; + cs->connect(); } void -FtpStateData::ftpPasvCallback(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data) +FtpStateData::ftpPasvCallback(Comm::Connection *conn, Vector *unused, comm_err_t status, int xerrno, void *data) { FtpStateData *ftpState = (FtpStateData *)data; debugs(9, 3, HERE); - ftpState->request->recordLookup(dns); +// TODO: dead? ftpState->request->recordLookup(dns); if (status != COMM_OK) { debugs(9, 2, HERE << "Failed to connect. Retrying without PASV."); @@ -2931,16 +2949,16 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { - io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); + io.details->remote.NtoA(ntoapeer,MAX_IPSTRLEN); if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << - io.details.peer << "), expecting " << + io.details->remote << "), expecting " << fd_table[ctrl.fd].ipaddr); - /* close the bad soures connection down ASAP. */ - comm_close(io.nfd); + /* close the bad sources connection down ASAP. */ + comm_close(io.details); /* we are ony accepting once, so need to re-open the listener socket. */ typedef CommCbMemFunT acceptDialer; @@ -2962,11 +2980,11 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) * Replace the Listen socket with the accepted data socket */ data.close(); data.opened(io.nfd, dataCloser()); - data.port = io.details.peer.GetPort(); - io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); + data.port = io.details->remote.GetPort(); + io.details->remote.NtoA(data.host,SQUIDHOSTNAMELEN); debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << - "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << + "FD " << io.nfd << " to " << io.details->remote << " FD table says: " << "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << "data-peer= " << fd_table[data.fd].ipaddr); diff --git a/src/gopher.cc b/src/gopher.cc index 293059ee46d..3e7b257ce08 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -990,7 +990,6 @@ CBDATA_TYPE(GopherStateData); void gopherStart(FwdState * fwd) { - int fd = fwd->server_fd; StoreEntry *entry = fwd->entry; GopherStateData *gopherState; CBDATA_INIT_TYPE(GopherStateData); @@ -1012,7 +1011,7 @@ gopherStart(FwdState * fwd) gopher_request_parse(fwd->request, &gopherState->type_id, gopherState->request); - comm_add_close_handler(fd, gopherStateFree, gopherState); + comm_add_close_handler(fwd->conn()->fd, gopherStateFree, gopherState); if (((gopherState->type_id == GOPHER_INDEX) || (gopherState->type_id == GOPHER_CSO)) && (strchr(gopherState->request, '?') == NULL)) { @@ -1032,12 +1031,12 @@ gopherStart(FwdState * fwd) gopherToHTML(gopherState, (char *) NULL, 0); fwd->complete(); - comm_close(fd); + comm_close(fwd->conn()); return; } - gopherState->fd = fd; + gopherState->fd = fwd->conn()->fd; // TODO: save the conn() in gopher instead of the FD gopherState->fwd = fwd; - gopherSendRequest(fd, gopherState); - commSetTimeout(fd, Config.Timeout.read, gopherTimeout, gopherState); + gopherSendRequest(fwd->conn()->fd, gopherState); + commSetTimeout(fwd->conn()->fd, Config.Timeout.read, gopherTimeout, gopherState); } diff --git a/src/http.cc b/src/http.cc index d10bea993f2..40123f58aa8 100644 --- a/src/http.cc +++ b/src/http.cc @@ -86,7 +86,7 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), debugs(11,5,HERE << "HttpStateData " << this << " created"); ignoreCacheControl = false; surrogateNoStore = false; - fd = fwd->server_fd; + fd = fwd->conn()->fd; // TODO: store Comm::Connection instead of FD readBuf = new MemBuf; readBuf->init(); orig_request = HTTPMSGLOCK(fwd->request); @@ -95,8 +95,8 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), orig_request->hier.peer_http_request_sent.tv_sec = 0; orig_request->hier.peer_http_request_sent.tv_usec = 0; - if (fwd->servers) - _peer = fwd->servers->_peer; /* might be NULL */ + if (fwd->conn()) + _peer = fwd->conn()->_peer; /* might be NULL */ if (_peer) { const char *url; @@ -106,8 +106,7 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), else url = entry->url(); - HttpRequest * proxy_req = new HttpRequest(orig_request->method, - orig_request->protocol, url); + HttpRequest * proxy_req = new HttpRequest(orig_request->method, orig_request->protocol, url); proxy_req->SetHost(_peer->host); diff --git a/src/ident/AclIdent.cc b/src/ident/AclIdent.cc index ba9e49c8e33..216c40c7d24 100644 --- a/src/ident/AclIdent.cc +++ b/src/ident/AclIdent.cc @@ -129,7 +129,11 @@ IdentLookup::checkForAsync(ACLChecklist *cl)const if (checklist->conn() != NULL) { debugs(28, 3, HERE << "Doing ident lookup" ); checklist->asyncInProgress(true); - Ident::Start(checklist->conn()->me, checklist->conn()->peer, LookupDone, checklist); + // TODO: store a Comm::Connection in either checklist or ConnStateData one day. + Comm::Connection cc; // IDENT will clone it's own copy for alterations. + cc.local = checklist->conn()->me; + cc.remote = checklist->conn()->peer; + Ident::Start(&cc, LookupDone, checklist); } else { debugs(28, DBG_IMPORTANT, "IdentLookup::checkForAsync: Can't start ident lookup. No client connection" ); checklist->currentAnswer(ACCESS_DENIED); diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc index 50f7a734e04..a34990a9a04 100644 --- a/src/ident/Ident.cc +++ b/src/ident/Ident.cc @@ -37,6 +37,8 @@ #if USE_IDENT #include "comm.h" +#include "comm/ConnectStateData.h" +#include "CommCalls.h" #include "ident/Config.h" #include "ident/Ident.h" #include "MemBuf.h" @@ -56,10 +58,7 @@ typedef struct _IdentClient { typedef struct _IdentStateData { hash_link hash; /* must be first */ - int fd; /* IDENT fd */ - - Ip::Address me; - Ip::Address my_peer; + Comm::Connection conn; IdentClient *clients; char buf[4096]; } IdentStateData; @@ -103,7 +102,7 @@ Ident::Close(int fdnotused, void *data) { IdentStateData *state = (IdentStateData *)data; identCallback(state, NULL); - comm_close(state->fd); + comm_close(&(state->conn)); hash_remove_link(ident_hash, (hash_link *) state); xfree(state->hash.key); cbdataFree(state); @@ -113,26 +112,28 @@ void Ident::Timeout(int fd, void *data) { IdentStateData *state = (IdentStateData *)data; - debugs(30, 3, "identTimeout: FD " << fd << ", " << state->my_peer); - - comm_close(fd); + debugs(30, 3, HERE << "FD " << fd << ", " << state->conn.remote); + comm_close(&(state->conn)); } void -Ident::ConnectDone(int fd, const DnsLookupDetails &, comm_err_t status, int xerrno, void *data) +Ident::ConnectDone(Comm::Connection *conn, Vector *unused, comm_err_t status, int xerrno, void *data) { IdentStateData *state = (IdentStateData *)data; - IdentClient *c; if (status != COMM_OK) { - /* Failed to connect */ - comm_close(fd); + if (status == COMM_TIMEOUT) { + debugs(30, 3, "IDENT connection timeout to " << state->conn.remote); + } return; } + assert(conn != NULL && conn == &(state->conn)); + /* * see if any of our clients still care */ + IdentClient *c; for (c = state->clients; c; c = c->next) { if (cbdataReferenceValid(c->callback_data)) break; @@ -140,18 +141,20 @@ Ident::ConnectDone(int fd, const DnsLookupDetails &, comm_err_t status, int xerr if (c == NULL) { /* no clients care */ - comm_close(fd); + comm_close(conn); return; } + comm_add_close_handler(conn->fd, Ident::Close, state); + MemBuf mb; mb.init(); mb.Printf("%d, %d\r\n", - state->my_peer.GetPort(), - state->me.GetPort()); - comm_write_mbuf(fd, &mb, NULL, state); - comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state); - commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state); + conn->remote.GetPort(), + conn->local.GetPort()); + comm_write_mbuf(conn->fd, &mb, NULL, state); + comm_read(conn->fd, state->buf, BUFSIZ, Ident::ReadReply, state); + commSetTimeout(conn->fd, Ident::TheConfig.timeout, Ident::Timeout, state); } void @@ -161,10 +164,11 @@ Ident::ReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi char *ident = NULL; char *t = NULL; - assert (buf == state->buf); + assert(buf == state->buf); + assert(fd == state->conn.fd); if (flag != COMM_OK || len <= 0) { - comm_close(fd); + comm_close(&(state->conn)); return; } @@ -181,7 +185,7 @@ Ident::ReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi if ((t = strchr(buf, '\n'))) *t = '\0'; - debugs(30, 5, "identReadReply: FD " << fd << ": Read '" << buf << "'"); + debugs(30, 5, HERE << "FD " << fd << ": Read '" << buf << "'"); if (strstr(buf, "USERID")) { if ((ident = strrchr(buf, ':'))) { @@ -190,7 +194,7 @@ Ident::ReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, voi } } - comm_close(fd); + comm_close(&(state->conn)); } void @@ -213,17 +217,15 @@ CBDATA_TYPE(IdentStateData); * start a TCP connection to the peer host on port 113 */ void -Ident::Start(Ip::Address &me, Ip::Address &my_peer, IDCB * callback, void *data) +Ident::Start(Comm::Connection *conn, IDCB * callback, void *data) { IdentStateData *state; - int fd; char key1[IDENT_KEY_SZ]; char key2[IDENT_KEY_SZ]; char key[IDENT_KEY_SZ]; - char ntoabuf[MAX_IPSTRLEN]; - me.ToURL(key1, IDENT_KEY_SZ); - my_peer.ToURL(key2, IDENT_KEY_SZ); + conn->local.ToURL(key1, IDENT_KEY_SZ); + conn->remote.ToURL(key2, IDENT_KEY_SZ); snprintf(key, IDENT_KEY_SZ, "%s,%s", key1, key2); if (!ident_hash) { @@ -234,33 +236,22 @@ Ident::Start(Ip::Address &me, Ip::Address &my_peer, IDCB * callback, void *data) return; } - Ip::Address addr = me; - addr.SetPort(0); // NP: use random port for secure outbound to IDENT_PORT - - fd = comm_open_listener(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING, - "ident"); - - if (fd == COMM_ERROR) { - /* Failed to get a local socket */ - callback(NULL, data); - return; - } - CBDATA_INIT_TYPE(IdentStateData); state = cbdataAlloc(IdentStateData); state->hash.key = xstrdup(key); - state->fd = fd; - state->me = me; - state->my_peer = my_peer; + /* clone the conn. we are about to destroy the conn + * for re-use of the addresses etc by IDENT. */ + state->conn = *conn; + state->conn.local.SetPort(0); // NP: use random port for secure outbound to IDENT_PORT + state->conn.flags |= COMM_NONBLOCKING; + ClientAdd(state, callback, data); hash_join(ident_hash, &state->hash); - comm_add_close_handler(fd, Ident::Close, state); - commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state); - state->my_peer.NtoA(ntoabuf,MAX_IPSTRLEN); - commConnectStart(fd, ntoabuf, IDENT_PORT, Ident::ConnectDone, state); + + AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state)); + ConnectStateData *cs = new ConnectStateData(&(state->conn), call); + cs->connect_timeout = Ident::TheConfig.timeout; + cs->connect(); } void diff --git a/src/ident/Ident.h b/src/ident/Ident.h index 7253a4736d2..b7cbb9f34bd 100644 --- a/src/ident/Ident.h +++ b/src/ident/Ident.h @@ -14,8 +14,7 @@ #if USE_IDENT #include "cbdata.h" - -#include "ip/forward.h" +#include "comm/Connection.h" namespace Ident { @@ -28,7 +27,7 @@ namespace Ident * Self-registers with a global ident lookup manager, * will call Ident::Init() itself if the manager has not been initialized already. */ -void Start(Ip::Address &me, Ip::Address &my_peer, IDCB * callback, void *cbdata); +void Start(Comm::Connection *conn, IDCB * callback, void *cbdata); /** \ingroup IdentAPI diff --git a/src/ipcache.cc b/src/ipcache.cc index 73bfe9bb3b1..8aea873340e 100644 --- a/src/ipcache.cc +++ b/src/ipcache.cc @@ -32,12 +32,13 @@ #include "squid.h" #include "cbdata.h" -#include "event.h" #include "CacheManager.h" +#include "DnsLookupDetails.h" +#include "event.h" +#include "ip/Address.h" #include "SquidTime.h" #include "Store.h" #include "wordlist.h" -#include "ip/Address.h" /** \defgroup IPCacheAPI IP Cache API diff --git a/src/main.cc b/src/main.cc index 761acc045ed..3779bd68f27 100644 --- a/src/main.cc +++ b/src/main.cc @@ -74,7 +74,7 @@ #include "MemPool.h" #include "icmp/IcmpSquid.h" #include "icmp/net_db.h" - +#include "PeerSelectState.h" #if USE_LOADABLE_MODULES #include "LoadableModules.h" #endif diff --git a/src/neighbors.cc b/src/neighbors.cc index e0f0717fe03..b7d9aafb014 100644 --- a/src/neighbors.cc +++ b/src/neighbors.cc @@ -46,6 +46,7 @@ #include "Store.h" #include "icmp/net_db.h" #include "ip/Address.h" +#include "comm/ConnectStateData.h" /* count mcast group peers every 15 minutes */ #define MCAST_COUNT_RATE 900 @@ -60,7 +61,7 @@ static void neighborAliveHtcp(peer *, const MemObject *, const htcpReplyData *); static void neighborCountIgnored(peer *); static void peerRefreshDNS(void *); static IPH peerDNSConfigure; -static int peerProbeConnect(peer *); +static bool peerProbeConnect(peer *); static CNCB peerProbeConnectDone; static void peerCountMcastPeersDone(void *data); static void peerCountMcastPeersStart(void *data); @@ -1342,68 +1343,45 @@ peerConnectSucceded(peer * p) p->tcp_up = p->connect_fail_limit; } -/// called by Comm when test_fd is closed while connect is in progress -static void -peerProbeClosed(int fd, void *data) -{ - peer *p = (peer*)data; - p->test_fd = -1; - // it is a failure because we failed to connect - peerConnectFailedSilent(p); -} - -static void -peerProbeConnectTimeout(int fd, void *data) -{ - peer * p = (peer *)data; - comm_remove_close_handler(fd, &peerProbeClosed, p); - comm_close(fd); - p->test_fd = -1; - peerConnectFailedSilent(p); -} - /* * peerProbeConnect will be called on dead peers by neighborUp */ -static int +static bool peerProbeConnect(peer * p) { - int fd; - time_t ctimeout = p->connect_timeout > 0 ? p->connect_timeout - : Config.Timeout.peer_connect; - int ret = squid_curtime - p->stats.last_connect_failure > ctimeout * 10; + time_t ctimeout = p->connect_timeout > 0 ? p->connect_timeout : Config.Timeout.peer_connect; + bool ret = (squid_curtime - p->stats.last_connect_failure) > (ctimeout * 10); - if (p->test_fd != -1) + if (p->testing_now) return ret;/* probe already running */ if (squid_curtime - p->stats.last_connect_probe == 0) return ret;/* don't probe to often */ - Ip::Address temp(getOutgoingAddr(NULL,p)); - - fd = comm_open(SOCK_STREAM, IPPROTO_TCP, temp, COMM_NONBLOCKING, p->host); - - if (fd < 0) - return ret; - - comm_add_close_handler(fd, &peerProbeClosed, p); - commSetTimeout(fd, ctimeout, peerProbeConnectTimeout, p); - - p->test_fd = fd; + /* for each IP address of this peer. find one that we can connect to and probe it. */ + Vector *paths = new Vector; + for (int i = 0; i < p->n_addresses; i++) { + Comm::Connection *conn = new Comm::Connection; + conn->remote = p->addresses[i]; + conn->remote.SetPort(p->http_port); + getOutgoingAddress(NULL, conn); + paths->push_back(conn); + } + p->testing_now = true; p->stats.last_connect_probe = squid_curtime; - commConnectStart(p->test_fd, - p->host, - p->http_port, - peerProbeConnectDone, - p); + AsyncCall::Pointer call = commCbCall(15,3, "peerProbeConnectDone", CommConnectCbPtrFun(peerProbeConnectDone, p)); + ConnectStateData *cs = new ConnectStateData(paths, call); + cs->connect_timeout = ctimeout; + cs->host = xstrdup(p->host); + cs->connect(); return ret; } static void -peerProbeConnectDone(int fd, const DnsLookupDetails &, comm_err_t status, int xerrno, void *data) +peerProbeConnectDone(Comm::Connection *conn, Vector *unused, comm_err_t status, int xerrno, void *data) { peer *p = (peer*)data; @@ -1413,9 +1391,8 @@ peerProbeConnectDone(int fd, const DnsLookupDetails &, comm_err_t status, int xe peerConnectFailedSilent(p); } - comm_remove_close_handler(fd, &peerProbeClosed, p); - comm_close(fd); - p->test_fd = -1; + comm_close(conn); + p->testing_now = false; return; } diff --git a/src/peer_select.cc b/src/peer_select.cc index 4cdc429f58e..9380c4a670f 100644 --- a/src/peer_select.cc +++ b/src/peer_select.cc @@ -33,17 +33,18 @@ */ #include "squid.h" +#include "acl/FilledChecklist.h" +#include "DnsLookupDetails.h" #include "event.h" -#include "PeerSelectState.h" -#include "Store.h" +#include "forward.h" #include "hier_code.h" -#include "ICP.h" -#include "HttpRequest.h" -#include "acl/FilledChecklist.h" #include "htcp.h" -#include "forward.h" -#include "SquidTime.h" +#include "HttpRequest.h" #include "icmp/net_db.h" +#include "ICP.h" +#include "PeerSelectState.h" +#include "SquidTime.h" +#include "Store.h" static struct { int timeouts; @@ -74,6 +75,8 @@ static void peerGetSomeParent(ps_state *); static void peerGetAllParents(ps_state *); static void peerAddFwdServer(FwdServer **, peer *, hier_code); static void peerSelectPinned(ps_state * ps); +static void peerSelectDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &details, void *data); + CBDATA_CLASS_INIT(ps_state); @@ -121,7 +124,8 @@ peerSelectIcpPing(HttpRequest * request, int direct, StoreEntry * entry) void -peerSelect(HttpRequest * request, +peerSelect(Vector *paths, + HttpRequest * request, StoreEntry * entry, PSC * callback, void *callback_data) @@ -139,6 +143,8 @@ peerSelect(HttpRequest * request, psstate->entry = entry; + psstate->paths = paths; + psstate->callback = callback; psstate->callback_data = cbdataReference(callback_data); @@ -182,8 +188,6 @@ peerSelectCallback(ps_state * psstate) { StoreEntry *entry = psstate->entry; FwdServer *fs = psstate->servers; - PSC *callback; - void *cbdata; if (entry) { debugs(44, 3, "peerSelectCallback: " << entry->url() ); @@ -203,17 +207,86 @@ peerSelectCallback(ps_state * psstate) psstate->ping.stop = current_time; psstate->request->hier.ping = psstate->ping; +} + +void +peerSelectDnsPaths(ps_state *psstate) +{ + FwdServer *fs = psstate->servers; + + // TODO enforce Config.forward_max_tries and/or Config.retry.maxtries + // the maximum number of paths we are allowed to try... + + // convert the list of FwdServer destinations into destinations IP addresses + if (fs) { + // send the next one off for DNS lookup. + const char *host = fs->_peer ? fs->_peer->host : psstate->request->GetHost(); + ipcache_nbgethostbyname(host, peerSelectDnsResults, psstate); + return; + } + + // done with DNS lookups. pass back to caller + PSC *callback; + callback = psstate->callback; psstate->callback = NULL; + void *cbdata; if (cbdataReferenceValidDone(psstate->callback_data, &cbdata)) { - psstate->servers = NULL; - callback(fs, cbdata); + callback(psstate->paths, cbdata); } peerSelectStateFree(psstate); } +static void +peerSelectDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &details, void *data) +{ + ps_state *psstate = (ps_state *)data; + + psstate->request->recordLookup(details); + + FwdServer *fs = psstate->servers; + if (ia != NULL) { + + assert(ia->cur < ia->count); + + // loop over each result address, adding to the possible destinations. + Comm::Connection *p; + int ip = ia->cur; + for (int n = 0; n < ia->count; n++, ip++) { + if (ip >= ia->count) ip = 0; // looped back to zero. + + // for TPROXY we must skip unusable addresses. + if (psstate->request->flags.spoof_client_ip && !(fs->_peer && fs->_peer->options.no_tproxy) ) { + if(ia->in_addrs[n].IsIPv4() != psstate->request->client_addr.IsIPv4()) { + // we CAN'T spoof the address on this link. find another. + continue; + } + } + + p = new Comm::Connection(); + p->remote = ia->in_addrs[n]; + p->peer_type = fs->code; + + // check for a configured outgoing address for this destination... + getOutgoingAddress(psstate->request, p); + p->tos = getOutgoingTOS(psstate->request); + + psstate->paths->push_back(p); + } + } else { + debugs(44, 3, HERE << "Unknown host: " << fs->_peer ? fs->_peer->host : psstate->request->GetHost()); + } + + psstate->servers = fs->next; + cbdataReferenceDone(fs->_peer); + memFree(fs, MEM_FWD_SERVER); + + // see if more paths can be found + peerSelectDnsPaths(psstate); +} + static int peerCheckNetdbDirect(ps_state * psstate) { @@ -265,7 +338,7 @@ peerSelectFoo(ps_state * ps) HttpRequest *request = ps->request; debugs(44, 3, "peerSelectFoo: '" << RequestMethodStr(request->method) << " " << request->GetHost() << "'"); - /** If we don't known whether DIRECT is permitted ... */ + /** If we don't know whether DIRECT is permitted ... */ if (ps->direct == DIRECT_UNKNOWN) { if (ps->always_direct == 0 && Config.accessList.AlwaysDirect) { /** check always_direct; */ @@ -347,12 +420,13 @@ peerSelectFoo(ps_state * ps) peerSelectCallback(ps); } -/* +int peerAllowedToUse(const peer * p, HttpRequest * request); + +/** * peerSelectPinned * - * Selects a pinned connection + * Selects a pinned connection. */ -int peerAllowedToUse(const peer * p, HttpRequest * request); static void peerSelectPinned(ps_state * ps) { @@ -374,7 +448,7 @@ peerSelectPinned(ps_state * ps) } } -/* +/** * peerGetSomeNeighbor * * Selects a neighbor (parent or sibling) based on one of the @@ -599,6 +673,7 @@ void peerSelectInit(void) { memset(&PeerStats, '\0', sizeof(PeerStats)); + memDataInit(MEM_FWD_SERVER, "FwdServer", sizeof(FwdServer), 0); } static void diff --git a/src/protos.h b/src/protos.h index d54dc3c4bfe..aef1f58c1c3 100644 --- a/src/protos.h +++ b/src/protos.h @@ -397,9 +397,6 @@ SQUIDCEXTERN int peerHTTPOkay(const peer *, HttpRequest *); SQUIDCEXTERN peer *whichPeer(const Ip::Address &from); -SQUIDCEXTERN void peerSelect(HttpRequest *, StoreEntry *, PSC *, void *data); -SQUIDCEXTERN void peerSelectInit(void); - /* peer_digest.c */ class PeerDigest; SQUIDCEXTERN PeerDigest *peerDigestCreate(peer * p); @@ -407,7 +404,8 @@ SQUIDCEXTERN void peerDigestNeeded(PeerDigest * pd); SQUIDCEXTERN void peerDigestNotePeerGone(PeerDigest * pd); SQUIDCEXTERN void peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e); -extern Ip::Address getOutgoingAddr(HttpRequest * request, struct peer *dst_peer); +#include "comm/Connection.h" +extern void getOutgoingAddress(HttpRequest * request, Comm::Connection *conn); unsigned long getOutgoingTOS(HttpRequest * request); SQUIDCEXTERN void urnStart(HttpRequest *, StoreEntry *); diff --git a/src/structs.h b/src/structs.h index 4217ff81d35..cdf1d897abc 100644 --- a/src/structs.h +++ b/src/structs.h @@ -908,7 +908,7 @@ struct peer { int n_addresses; int rr_count; peer *next; - int test_fd; + bool testing_now; struct { unsigned int hash; diff --git a/src/tunnel.cc b/src/tunnel.cc index a602b92a5ef..e945eb785e0 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -1,4 +1,3 @@ - /* * $Id$ * @@ -34,18 +33,22 @@ */ #include "squid.h" -#include "errorpage.h" -#include "HttpRequest.h" -#include "fde.h" +#include "acl/FilledChecklist.h" +#include "Array.h" #include "comm.h" +#include "comm/Connection.h" +#include "comm/ConnectStateData.h" +#include "client_side.h" #include "client_side_request.h" -#include "acl/FilledChecklist.h" #if DELAY_POOLS #include "DelayId.h" #endif -#include "client_side.h" -#include "MemBuf.h" +#include "errorpage.h" +#include "fde.h" +#include "HttpRequest.h" #include "http.h" +#include "MemBuf.h" +#include "PeerSelectState.h" class TunnelStateData { @@ -65,7 +68,7 @@ class TunnelStateData char *host; /* either request->host or proxy host */ u_short port; HttpRequest *request; - FwdServer *servers; + Vector *paths; class Connection { @@ -173,7 +176,7 @@ tunnelStateFree(TunnelStateData * tunnelState) assert(tunnelState != NULL); assert(tunnelState->noConnections()); safe_free(tunnelState->url); - FwdState::serversFree(&tunnelState->servers); + if (tunnelState->paths) tunnelState->paths->clean(); tunnelState->host = NULL; HTTPMSGUNLOCK(tunnelState->request); delete tunnelState; @@ -181,7 +184,7 @@ tunnelStateFree(TunnelStateData * tunnelState) TunnelStateData::Connection::~Connection() { - safe_free (buf); + safe_free(buf); } int @@ -463,6 +466,7 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion) comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this); } +#if UNUSED //? static void tunnelConnectTimeout(int fd, void *data) { @@ -470,18 +474,18 @@ tunnelConnectTimeout(int fd, void *data) HttpRequest *request = tunnelState->request; ErrorState *err = NULL; - if (tunnelState->servers) { - if (tunnelState->servers->_peer) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->servers->_peer->host); + if (tunnelState->paths != NULL && tunnelState->paths->size() > 0) { + if ((*(tunnelState->paths))[0]->_peer) + hierarchyNote(&tunnelState->request->hier, (*(tunnelState->paths))[0]->peer_type, + (*(tunnelState->paths))[0]->_peer->host); else if (Config.onoff.log_ip_on_direct) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, + hierarchyNote(&tunnelState->request->hier, (*(tunnelState->paths))[0]->peer_type, fd_table[tunnelState->server.fd()].ipaddr); else - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, + hierarchyNote(&tunnelState->request->hier, (*(tunnelState->paths))[0]->peer_type, tunnelState->host); } else - debugs(26, 1, "tunnelConnectTimeout(): tunnelState->servers is NULL"); + debugs(26, DBG_IMPORTANT, "tunnelConnectTimeout(): no forwarding destinations available."); err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); @@ -498,6 +502,7 @@ tunnelConnectTimeout(int fd, void *data) errorSend(tunnelState->client.fd(), err); comm_close(fd); } +#endif static void tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) @@ -553,52 +558,63 @@ tunnelErrorComplete(int fdnotused, void *data, size_t sizenotused) static void -tunnelConnectDone(int fdnotused, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data) +tunnelConnectDone(Comm::Connection *unused, Vector *paths, comm_err_t status, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; HttpRequest *request = tunnelState->request; ErrorState *err = NULL; + Comm::Connection *conn = (*paths)[0]; + + assert(tunnelState->paths == paths); - request->recordLookup(dns); +#if DELAY_POOLS + /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */ + if (conn->_peer && conn->_peer->options.no_delay) + tunnelState->server.setDelayId(DelayId()); +#endif - if (tunnelState->servers->_peer) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->servers->_peer->host); + if (conn != NULL && conn->_peer) + hierarchyNote(&tunnelState->request->hier, conn->peer_type, conn->_peer->host); else if (Config.onoff.log_ip_on_direct) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - fd_table[tunnelState->server.fd()].ipaddr); + hierarchyNote(&tunnelState->request->hier, conn->peer_type, fd_table[conn->fd].ipaddr); else - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->host); - - if (status == COMM_ERR_DNS) { - debugs(26, 4, "tunnelConnect: Unknown host: " << tunnelState->host); - err = errorCon(ERR_DNS_FAIL, HTTP_NOT_FOUND, request); - *tunnelState->status_ptr = HTTP_NOT_FOUND; - err->dnsError = dns.error; - err->callback = tunnelErrorComplete; - err->callback_data = tunnelState; - errorSend(tunnelState->client.fd(), err); - } else if (status != COMM_OK) { + hierarchyNote(&tunnelState->request->hier, conn->peer_type, tunnelState->host); + + if (status != COMM_OK) { err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE; err->xerrno = xerrno; - err->port = tunnelState->port; + // on timeout is this still: err->xerrno = ETIMEDOUT; + err->port = conn->remote.GetPort(); err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.fd(), err); + return; + } + + tunnelState->server.fd(conn->fd); + comm_add_close_handler(tunnelState->server.fd(), tunnelServerClosed, tunnelState); + + // TODO: hold the conn. drop these fields. + tunnelState->host = conn->_peer ? conn->_peer->host : xstrdup(request->GetHost()); + request->peer_host = conn->_peer ? conn->_peer->host : NULL; + tunnelState->port = conn->remote.GetPort(); + + if (conn->_peer) { + tunnelState->request->peer_login = conn->_peer->login; + tunnelState->request->flags.proxying = 1; } else { - if (tunnelState->servers->_peer) - tunnelProxyConnected(tunnelState->server.fd(), tunnelState); - else { - tunnelConnected(tunnelState->server.fd(), tunnelState); - } + tunnelState->request->peer_login = NULL; + tunnelState->request->flags.proxying = 0; + } - commSetTimeout(tunnelState->server.fd(), - Config.Timeout.read, - tunnelTimeout, - tunnelState); + if (conn->_peer) + tunnelProxyConnected(tunnelState->server.fd(), tunnelState); + else { + tunnelConnected(tunnelState->server.fd(), tunnelState); } + + commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState); } void @@ -606,7 +622,6 @@ tunnelStart(ClientHttpRequest * http, int64_t * size_ptr, int *status_ptr) { /* Create state structure. */ TunnelStateData *tunnelState = NULL; - int sock; ErrorState *err = NULL; int answer; int fd = http->getConn()->fd; @@ -639,43 +654,16 @@ tunnelStart(ClientHttpRequest * http, int64_t * size_ptr, int *status_ptr) debugs(26, 3, "tunnelStart: '" << RequestMethodStr(request->method) << " " << url << "'"); statCounter.server.all.requests++; statCounter.server.other.requests++; - /* Create socket. */ - Ip::Address temp = getOutgoingAddr(request,NULL); - int flags = COMM_NONBLOCKING; - if (request->flags.spoof_client_ip) { - flags |= COMM_TRANSPARENT; - } - sock = comm_openex(SOCK_STREAM, - IPPROTO_TCP, - temp, - flags, - getOutgoingTOS(request), - url); - - if (sock == COMM_ERROR) { - debugs(26, 4, "tunnelStart: Failed because we're out of sockets."); - err = errorCon(ERR_SOCKET_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request); - *status_ptr = HTTP_INTERNAL_SERVER_ERROR; - err->xerrno = errno; - errorSend(fd, err); - return; - } tunnelState = new TunnelStateData; #if DELAY_POOLS - tunnelState->server.setDelayId(DelayId::DelayClient(http)); #endif - tunnelState->url = xstrdup(url); tunnelState->request = HTTPMSGLOCK(request); tunnelState->server.size_ptr = size_ptr; tunnelState->status_ptr = status_ptr; tunnelState->client.fd(fd); - tunnelState->server.fd(sock); - comm_add_close_handler(tunnelState->server.fd(), - tunnelServerClosed, - tunnelState); comm_add_close_handler(tunnelState->client.fd(), tunnelClientClosed, tunnelState); @@ -683,14 +671,12 @@ tunnelStart(ClientHttpRequest * http, int64_t * size_ptr, int *status_ptr) Config.Timeout.lifetime, tunnelTimeout, tunnelState); - commSetTimeout(tunnelState->server.fd(), - Config.Timeout.connect, - tunnelConnectTimeout, - tunnelState); - peerSelect(request, + + peerSelect(tunnelState->paths, request, NULL, tunnelPeerSelectComplete, tunnelState); + /* * Disable the client read handler until peer selection is complete * Take control away from client_side.c. @@ -727,13 +713,12 @@ tunnelProxyConnected(int fd, void *data) } static void -tunnelPeerSelectComplete(FwdServer * fs, void *data) +tunnelPeerSelectComplete(Vector *peer_paths, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; HttpRequest *request = tunnelState->request; - peer *g = NULL; - if (fs == NULL) { + if (peer_paths == NULL || peer_paths->size() < 1) { ErrorState *err; err = errorCon(ERR_CANNOT_FORWARD, HTTP_SERVICE_UNAVAILABLE, request); *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE; @@ -743,40 +728,11 @@ tunnelPeerSelectComplete(FwdServer * fs, void *data) return; } - tunnelState->servers = fs; - tunnelState->host = fs->_peer ? fs->_peer->host : xstrdup(request->GetHost()); - request->peer_host = fs->_peer ? fs->_peer->host : NULL; - - if (fs->_peer == NULL) { - tunnelState->port = request->port; - } else if (fs->_peer->http_port != 0) { - tunnelState->port = fs->_peer->http_port; - } else if ((g = peerFindByName(fs->_peer->host))) { - tunnelState->port = g->http_port; - } else { - tunnelState->port = CACHE_HTTP_PORT; - } - - if (fs->_peer) { - tunnelState->request->peer_login = fs->_peer->login; - tunnelState->request->flags.proxying = 1; - } else { - tunnelState->request->peer_login = NULL; - tunnelState->request->flags.proxying = 0; - } - -#if DELAY_POOLS - /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */ - if (g && g->options.no_delay) - tunnelState->server.setDelayId(DelayId()); - -#endif - - commConnectStart(tunnelState->server.fd(), - tunnelState->host, - tunnelState->port, - tunnelConnectDone, - tunnelState); + AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); + ConnectStateData *cs = new ConnectStateData(tunnelState->paths, call); + cs->host = xstrdup(tunnelState->url); + cs->connect_timeout = Config.Timeout.connect; + cs->connect(); } CBDATA_CLASS_INIT(TunnelStateData); diff --git a/src/typedefs.h b/src/typedefs.h index b316c47a42d..6a51b13ad33 100644 --- a/src/typedefs.h +++ b/src/typedefs.h @@ -200,8 +200,6 @@ typedef void IDCB(const char *ident, void *data); typedef void IPH(const ipcache_addrs *, const DnsLookupDetails &details, void *); typedef void IRCB(struct peer *, peer_t, protocol_t, void *, void *data); -class FwdServer; -typedef void PSC(FwdServer *, void *); typedef void RH(void *data, char *); /* in wordlist.h */ diff --git a/src/whois.cc b/src/whois.cc index d7297c48581..b148d3ad6bf 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -81,7 +81,7 @@ void whoisStart(FwdState * fwd) { WhoisState *p; - int fd = fwd->server_fd; + int fd = fwd->conn()->fd; char *buf; size_t l; CBDATA_INIT_TYPE(WhoisState);