Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly track (and mark) truncated store entries #909

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions src/FwdState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ FwdState::FwdState(const Comm::ConnectionPointer &client, StoreEntry * e, HttpRe
start_t(squid_curtime),
n_tries(0),
destinations(new ResolvedPeers()),
pconnRace(raceImpossible)
pconnRace(raceImpossible),
storedWholeReply_(nullptr)
yadij marked this conversation as resolved.
Show resolved Hide resolved
{
debugs(17, 2, "Forwarding client request " << client << ", url=" << e->url());
HTTPMSGLOCK(request);
Expand Down Expand Up @@ -255,6 +256,23 @@ FwdState::selectPeerForIntercepted()
}
#endif

/// updates ALE when we finalize the transaction error (if any)
void
FwdState::updateAleWithFinalError()
{
if (!err || !al)
return;

LogTagsErrors lte;
lte.timedout = (err->xerrno == ETIMEDOUT || err->type == ERR_READ_TIMEOUT);
al->cache.code.err.update(lte);
if (!err->detail) {
static const auto d = MakeNamedErrorDetail("WITH_SERVER");
err->detailError(d);
}
al->updateError(Error(err->type, err->detail));
}

void
FwdState::completed()
{
Expand All @@ -279,20 +297,26 @@ FwdState::completed()

if (entry->store_status == STORE_PENDING) {
if (entry->isEmpty()) {
assert(!storedWholeReply_);
if (!err) // we quit (e.g., fd closed) before an error or content
fail(new ErrorState(ERR_READ_ERROR, Http::scBadGateway, request, al));
assert(err);
updateAleWithFinalError();
errorAppendEntry(entry, err);
err = NULL;
#if USE_OPENSSL
if (request->flags.sslPeek && request->clientConnectionManager.valid()) {
CallJobHere1(17, 4, request->clientConnectionManager, ConnStateData,
ConnStateData::httpsPeeked, ConnStateData::PinnedIdleContext(Comm::ConnectionPointer(nullptr), request));
// no flags.dont_retry: completed() is a post-reforward() act
}
#endif
} else {
entry->complete();
entry->releaseRequest();
updateAleWithFinalError(); // if any
if (storedWholeReply_)
entry->completeSuccessfully(storedWholeReply_);
else
entry->completeTruncated("FwdState default");
}
}

Expand Down Expand Up @@ -549,6 +573,7 @@ FwdState::complete()
serverConn = nullptr;
destinationReceipt = nullptr;

storedWholeReply_ = nullptr;
entry->reset();

useDestinations();
Expand All @@ -558,10 +583,8 @@ FwdState::complete()
debugs(17, 3, "server FD " << serverConnection()->fd << " not re-forwarding status " << replyStatus);
else
debugs(17, 3, "server (FD closed) not re-forwarding status " << replyStatus);
entry->complete();

if (!Comm::IsConnOpen(serverConn))
completed();
completed();

stopAndDestroy("forwarding completed");
}
Expand All @@ -573,6 +596,18 @@ FwdState::usingDestination() const
return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
}

void
FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure)
{
debugs(17, 5, whyWeAreSure << " for " << *entry);

// the caller wrote everything to Store, but Store may silently abort writes
if (EBIT_TEST(entry->flags, ENTRY_ABORTED))
return;

storedWholeReply_ = whyWeAreSure;
}

void
FwdState::noteDestination(Comm::ConnectionPointer path)
{
Expand Down Expand Up @@ -1010,6 +1045,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
// [in ways that may affect logging?]. Consider informing
// ConnStateData about our tunnel or otherwise unifying tunnel
// establishment [side effects].
flags.dont_retry = true; // TunnelStateData took forwarding control
entry->abort();
complete(); // destroys us
return;
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
Expand Down Expand Up @@ -1203,9 +1240,12 @@ FwdState::dispatch()

#if USE_OPENSSL
if (request->flags.sslPeek) {
// we were just asked to peek at the server, and we did that
CallJobHere1(17, 4, request->clientConnectionManager, ConnStateData,
ConnStateData::httpsPeeked, ConnStateData::PinnedIdleContext(serverConnection(), request));
unregister(serverConn); // async call owns it now
flags.dont_retry = true; // we gave up forwarding control
entry->abort();
complete(); // destroys us
return;
}
Expand Down
11 changes: 11 additions & 0 deletions src/FwdState.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ class FwdState: public RefCountable, public PeerSelectionInitiator
void unregister(Comm::ConnectionPointer &conn);
void unregister(int fd);
void complete();

/// Mark reply as written to Store in its entirety, including the header and
/// any body. If the reply has a body, the entire body has to be stored.
void markStoredReplyAsWhole(const char *whyWeAreSure);

void handleUnregisteredServerEnd();
int reforward();
bool reforwardableStatus(const Http::StatusCode s) const;
Expand Down Expand Up @@ -159,6 +164,8 @@ class FwdState: public RefCountable, public PeerSelectionInitiator
void notifyConnOpener();
void reactToZeroSizeObject();

void updateAleWithFinalError();
yadij marked this conversation as resolved.
Show resolved Hide resolved

public:
StoreEntry *entry;
HttpRequest *request;
Expand Down Expand Up @@ -200,6 +207,10 @@ class FwdState: public RefCountable, public PeerSelectionInitiator
/// possible pconn race states
typedef enum { raceImpossible, racePossible, raceHappened } PconnRace;
PconnRace pconnRace; ///< current pconn race state

/// Whether the entire reply (including any body) was written to Store.
/// The string literal value is only used for debugging.
const char *storedWholeReply_;
};

class acl_tos;
Expand Down
16 changes: 14 additions & 2 deletions src/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,18 @@ class StoreEntry : public hash_link, public Packable
bool isEmpty() const { return mem().endOffset() == 0; }
bool isAccepting() const;
size_t bytesWanted(Range<size_t> const aRange, bool ignoreDelayPool = false) const;
/// flags [truncated or too big] entry with ENTRY_BAD_LENGTH and releases it
void lengthWentBad(const char *reason);

/// Signals that the entire response has been stored and no more append()
/// calls should be expected; cf. completeTruncated().
void completeSuccessfully(const char *whyWeAreSureWeStoredTheWholeReply);

/// Signals that a partial response (if any) has been stored but no more
/// append() calls should be expected; cf. completeSuccessfully().
void completeTruncated(const char *whyWeConsiderTheReplyTruncated);

/// \deprecated use either completeSuccessfully() or completeTruncated() instead
void complete();

store_client_t storeClientType() const;
/// \returns a malloc()ed buffer containing a length-long packed swap header
const char *getSerialisedMetaData(size_t &length) const;
Expand Down Expand Up @@ -308,6 +317,9 @@ class StoreEntry : public hash_link, public Packable
StoreEntry *adjustVary();
const cache_key *calcPublicKey(const KeyScope keyScope);

/// flags [truncated or too big] entry with ENTRY_BAD_LENGTH and releases it
void lengthWentBad(const char *reason);

static MemAllocator *pool;

unsigned short lock_count; /* Assume < 65536! */
Expand Down
10 changes: 9 additions & 1 deletion src/client_side.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3993,8 +3993,16 @@ ConnStateData::unpinConnection(const bool andClose)
}

void
ConnStateData::terminateAll(const Error &error, const LogTagsErrors &lte)
ConnStateData::terminateAll(const Error &rawError, const LogTagsErrors &lte)
{
auto error = rawError; // (cheap) copy so that we can detail
// We detail even ERR_NONE: There should be no transactions left, and
// detailed ERR_NONE will be unused. Otherwise, this detail helps in triage.
if (!error.detail) {
static const auto d = MakeNamedErrorDetail("WITH_CLIENT");
error.detail = d;
}

debugs(33, 3, pipeline.count() << '/' << pipeline.nrequests << " after " << error);

if (pipeline.empty()) {
Expand Down
15 changes: 14 additions & 1 deletion src/client_side_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ ClientHttpRequest::ClientHttpRequest(ConnStateData * aConn) :
, sslBumpNeed_(Ssl::bumpEnd)
#endif
#if USE_ADAPTATION
, receivedWholeAdaptedReply(false)
, request_satisfaction_mode(false)
, request_satisfaction_offset(0)
#endif
Expand Down Expand Up @@ -2113,6 +2114,11 @@ void
ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
{
assert(!virginHeadSource);

// distinguish this code path from future noteBodyProducerAborted() that
// would continue storing/delivering (truncated) reply if necessary (TODO)
receivedWholeAdaptedReply = true;

// should we end request satisfaction now?
if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
endRequestSatisfaction();
Expand All @@ -2126,7 +2132,14 @@ ClientHttpRequest::endRequestSatisfaction()
stopConsumingFrom(adaptedBodySource);

// TODO: anything else needed to end store entry formation correctly?
storeEntry()->complete();
if (receivedWholeAdaptedReply) {
// We received the entire reply per receivedWholeAdaptedReply.
// We are called when we consumed everything received (per our callers).
// We consume only what we store per noteMoreBodyDataAvailable().
storeEntry()->completeSuccessfully("received, consumed, and, hence, stored the entire REQMOD reply");
} else {
storeEntry()->completeTruncated("REQMOD request satisfaction default");
}
}

void
Expand Down
3 changes: 3 additions & 0 deletions src/client_side_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ class ClientHttpRequest
CbcPointer<Adaptation::Initiate> virginHeadSource;
BodyPipe::Pointer adaptedBodySource;

/// noteBodyProductionEnded() was called
bool receivedWholeAdaptedReply;

bool request_satisfaction_mode;
int64_t request_satisfaction_offset;
#endif
Expand Down
34 changes: 33 additions & 1 deletion src/clients/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,27 @@ Client::setFinalReply(HttpReply *rep)
return theFinalReply;
}

void
Client::markParsedVirginReplyAsWhole(const char *reasonWeAreSure)
{
assert(reasonWeAreSure);
debugs(11, 3, reasonWeAreSure);

// The code storing adapted reply takes care of markStoredReplyAsWhole().
// We need to take care of the remaining regular network-to-store case.
#if USE_ADAPTATION
if (startedAdaptation) {
debugs(11, 5, "adaptation handles markStoredReplyAsWhole()");
return;
}
#endif

// Convert the "parsed whole virgin reply" event into the "stored..." event
// because, without adaptation, we store everything we parse: There is no
// buffer for parsed content; addVirginReplyBody() stores every parsed byte.
fwd->markStoredReplyAsWhole(reasonWeAreSure);
}

// called when no more server communication is expected; may quit
void
Client::serverComplete()
Expand Down Expand Up @@ -722,6 +743,7 @@ Client::handleAdaptedHeader(Http::Message *msg)
assert(result);
} else {
// no body
fwd->markStoredReplyAsWhole("setFinalReply() stored header-only adapted reply");
if (doneWithAdaptation()) // we may still be sending virgin response
handleAdaptationCompleted();
}
Expand Down Expand Up @@ -796,6 +818,9 @@ Client::handleAdaptedBodyProductionEnded()
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;

// distinguish this code path from handleAdaptedBodyProducerAborted()
receivedWholeAdaptedReply = true;

// end consumption if we consumed everything
if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
endAdaptedBodyConsumption();
Expand All @@ -806,6 +831,14 @@ void
Client::endAdaptedBodyConsumption()
{
stopConsumingFrom(adaptedBodySource);

if (receivedWholeAdaptedReply) {
// We received the entire adapted reply per receivedWholeAdaptedReply.
// We are called when we consumed everything received (per our callers).
// We consume only what we store per handleMoreAdaptedBodyAvailable().
fwd->markStoredReplyAsWhole("received,consumed=>stored the entire RESPMOD reply");
}

handleAdaptationCompleted();
}

Expand All @@ -826,7 +859,6 @@ void Client::handleAdaptedBodyProducerAborted()
if (handledEarlyAdaptationAbort())
return;

entry->lengthWentBad("body adaptation aborted");
handleAdaptationCompleted(); // the user should get a truncated response
}

Expand Down
7 changes: 7 additions & 0 deletions src/clients/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class Client:
public: // should be protected
void serverComplete(); /**< call when no server communication is expected */

/// remember that the received virgin reply was parsed in its entirety,
/// including its body (if any)
void markParsedVirginReplyAsWhole(const char *reasonWeAreSure);

private:
void serverComplete2(); /**< Continuation of serverComplete */
bool completed = false; /**< serverComplete() has been called */
Expand Down Expand Up @@ -176,6 +180,9 @@ class Client:

bool adaptationAccessCheckPending = false;
bool startedAdaptation = false;

/// handleAdaptedBodyProductionEnded() was called
bool receivedWholeAdaptedReply = false;
#endif
bool receivedWholeRequestBody = false; ///< handleRequestBodyProductionEnded called

Expand Down
1 change: 1 addition & 0 deletions src/clients/FtpGateway.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,7 @@ ftpReadTransferDone(Ftp::Gateway * ftpState)
ftpState->completedListing();
/* QUIT operation handles sending the reply to client */
}
ftpState->markParsedVirginReplyAsWhole("ftpReadTransferDone code 226 or 250");
ftpSendQuit(ftpState);
} else { /* != 226 */
debugs(9, DBG_IMPORTANT, HERE << "Got code " << code << " after reading data");
Expand Down
8 changes: 7 additions & 1 deletion src/clients/FtpRelay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ Ftp::Relay::forwardReply()
reply->sources |= Http::Message::srcFtp;

setVirginReply(reply);
markParsedVirginReplyAsWhole("Ftp::Relay::handleControlReply() does not forward partial replies");
adaptOrFinalizeReply();

serverComplete();
Expand Down Expand Up @@ -719,7 +720,12 @@ Ftp::Relay::readTransferDoneReply()
{
debugs(9, 3, status());

if (ctrl.replycode != 226 && ctrl.replycode != 250) {
// RFC 959 says that code 226 may indicate a successful response to a file
// transfer and file abort commands, but since we do not send abort
// commands, let's assume it was a successful file transfer.
if (ctrl.replycode == 226 || ctrl.replycode == 250) {
markParsedVirginReplyAsWhole("Ftp::Relay::readTransferDoneReply() code 226 or 250");
} else {
debugs(9, DBG_IMPORTANT, "got FTP code " << ctrl.replycode <<
" after reading response data");
}
Expand Down
Loading