diff --git a/agentstress.ecl b/agentstress.ecl new file mode 100644 index 00000000000..cce54da7974 --- /dev/null +++ b/agentstress.ecl @@ -0,0 +1,19 @@ +rtl := SERVICE + unsigned4 sleep(unsigned4 _delay) : eclrtl,action,library='eclrtl',entrypoint='rtlSleep'; +END; + +s := 0 : STORED('s'); + +MyRec := RECORD + STRING5 Value1; +END; + +MyRec t1(unsigned c) := transform + SELF.value1 := 'HELL'+rtl.sleep(s); +END; + +ds := NOFOLD(DATASET(1, t1(COUNTER))); +a(STRING f) := allnodes(ds(Value1=f)); +b(STRING f) := a(f+'1')+a(f+'2')+a(f+'3')+a(f+'4')+a(f+'5')+a(f+'6')+a(f+'7')+a(f+'8')+a(f+'9')+a(f+'10'); +c := b('1')+b('2')+b('3')+b('4')+b('5')+b('6')+b('7')+b('8')+b('9')+b('10'); +c; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index d0af342b917..dd82bf9c11f 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -284,7 +284,11 @@ void init_signals() { // signal(SIGTERM, caughtSIGTERM); #ifndef _WIN32 +#ifdef __APPLE__ + signal(SIGPIPE, SIG_IGN); // Otherwise it seems semaphore gets interrupted +#else signal(SIGPIPE, caughtSIGPIPE); +#endif signal(SIGHUP, caughtSIGHUP); signal(SIGALRM, caughtSIGALRM); @@ -484,18 +488,23 @@ void readStaticTopology() unsigned numNodes = topology->getCount("./RoxieServerProcess"); if (!numNodes && oneShotRoxie) { - if (topology->getPropBool("expert/@addDummyNode", false)) + unsigned dummyNodes = topology->getPropInt("expert/@addDummyNode", 0); + if (dummyNodes) { // Special config for testing some multinode things on a single node + numNodes = dummyNodes + 1; topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "."); - topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "192.0.2.0"); // A non-existent machine (this address is reserved for documentation) - numNodes = 2; + for (unsigned dummyNo = 0; dummyNo < dummyNodes; dummyNo++) + { + VStringBuffer dummyIP("192.0.2.%u", dummyNo); // A non-existent machine (this address is reserved for documentation) + topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", dummyIP.str()); + } localAgent = false; - topology->setPropInt("@numChannels", 2); - numChannels = 2; - topology->setPropInt("@numDataCopies", 2); - topology->setPropInt("@channelsPerNode", 2); - topology->setProp("@agentConfig", "cyclic"); + topology->setPropInt("@numChannels", 1); + numChannels = 1; + topology->setPropInt("@numDataCopies", numNodes); + topology->setPropInt("@channelsPerNode", 1); + topology->setProp("@agentConfig", "simple"); } else if (oneShotRoxie) { @@ -506,6 +515,8 @@ void readStaticTopology() Owned roxieServers = topology->getElements("./RoxieServerProcess"); bool myNodeSet = false; + StringBuffer forceIP; + topology->getProp("expert/@forceIP", forceIP); unsigned calcNumChannels = 0; ForEach(*roxieServers) { @@ -514,11 +525,23 @@ void readStaticTopology() IpAddress ip(iptext); if (ip.isNull()) throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext); - if (ip.isLocal() && !myNodeSet) + if (forceIP.length()) { - myNodeSet = true; - myNode.setIp(ip); - myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + if (streq(iptext, forceIP) && !myNodeSet) + { + myNodeSet = true; + myNode.setIp(ip); + myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + } + } + else + { + if (ip.isLocal() && !myNodeSet) + { + myNodeSet = true; + myNode.setIp(ip); + myAgentEP.set(ccdMulticastPort, myNode.getIpAddress()); + } } ForEachItemIn(idx, nodeTable) { diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index e0ad44803d8..1b00584e29f 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -794,8 +794,8 @@ class CSerializedRoxieQueryPacket : public CRoxieQueryPacketBase, implements ISe extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len) { - if (!encryptInTransit) - return new CNocryptRoxieQueryPacket(_data, _len); + //if (!encryptInTransit) + // return new CNocryptRoxieQueryPacket(_data, _len); if ((unsigned short)_len != _len) { StringBuffer s; @@ -844,8 +844,13 @@ extern IRoxieQueryPacket *deserializeCallbackPacket(MemoryBuffer &m) extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m) { unsigned length = m.length(); // don't make assumptions about evaluation order of parameters... - if (encryptInTransit) - return new CSerializedRoxieQueryPacket(m.detachOwn(), length); + if (true || encryptInTransit) + { +// void *data = m.detachOwn(); + void *data = malloc(length); + memcpy(data, m.bufferBase(), length); + return new CSerializedRoxieQueryPacket(data, length); + } else return new CNocryptRoxieQueryPacket(m.detachOwn(), length); } @@ -2194,7 +2199,10 @@ class DelayedPacketQueue void append(ISerializedRoxieQueryPacket *packet, unsigned expires) { - // Goes on the end. But percolate the expiry time backwards + // Insert in the list at the appropriate point that the list stays in expiry order + // With a single buddy this is usually on the end, but with multiple buddies we will be inserting at various points. + // Should we consider a queue per buddy? + assert(GetCurrentThreadId()==roxiePacketReaderThread); packet->noteQueued(0); DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires); @@ -2203,6 +2211,8 @@ class DelayedPacketQueue StringBuffer s; DBGLOG("Adding delayed packet %s expires in %u ms", packet->queryHeader().toString(s).str(), expires - msTick()); } +#if 0 +// old code assumed only one buddy newEntry->prev = tail; if (tail) { @@ -2218,6 +2228,41 @@ class DelayedPacketQueue else head = newEntry; tail = newEntry; +#else + if (tail) + { + DelayedPacketEntry *last = nullptr; + DelayedPacketEntry *finger = tail; + while (finger != nullptr) + { + if ((int) (finger->waitExpires - expires) <= 0) + break; + last = finger; + finger = finger->prev; + } + if (last) + last->prev = newEntry; + else + tail = newEntry; + newEntry->next = last; + newEntry->prev = finger; + if (finger) + finger->next = newEntry; + else + head = newEntry; + } + else + { + head = newEntry; + tail = newEntry; + } +#endif + numEntries++; + if (numEntries > maxNumEntries) + { + maxNumEntries = numEntries; + DBGLOG("WorkerUdpReader: Max IBYTI queue length is now %u", numEntries); + } } // Move any that we are done waiting for our buddy onto the active queue @@ -2283,11 +2328,13 @@ class DelayedPacketQueue if (goer==tail) tail = goer->prev; delete goer; + numEntries--; } DelayedPacketEntry *head = nullptr; DelayedPacketEntry *tail = nullptr; - + unsigned numEntries = 0; + unsigned maxNumEntries = 0; }; //------------------------------------------------------------------------------------------------------------ @@ -2393,7 +2440,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase DelayedPacketQueueManager delayed; #endif - class WorkerUdpTracker : public TimeDivisionTracker<6, false> + class WorkerUdpTracker : public TimeDivisionTracker<13, false> { public: enum @@ -2403,17 +2450,31 @@ class RoxieSocketQueueManager : public RoxieReceiverBase allocating, processing, pushing, - checkingRunning + deferring, + checkingRunning, + checkingExpired, + decoding, + acknowledging, + retrying, + creatingPacket, + getTimeout }; - WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds) + WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<13, false>(name, reportIntervalSeconds) { stateNames[other] = "other"; stateNames[waiting] = "waiting"; stateNames[allocating] = "allocating"; stateNames[processing] = "processing"; stateNames[pushing] = "pushing"; + stateNames[deferring] = "deferring"; stateNames[checkingRunning] = "checking running"; + stateNames[checkingExpired] = "checking expiry"; + stateNames[decoding] = "decoding"; + stateNames[acknowledging] = "acknowledging"; + stateNames[retrying] = "retrying"; + stateNames[creatingPacket] = "creating packet"; + stateNames[getTimeout] = "getting timeout"; } } timeTracker; @@ -2437,7 +2498,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } readThread; public: - RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this) + RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 5), readThread(*this) { maxPacketSize = multicastSocket->get_max_send_size(); if ((maxPacketSize==0)||(maxPacketSize>65535)) @@ -2666,6 +2727,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow // DO NOT put tracing on this thread except at very high tracelevels! + WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::processing); if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0) doIbyti(header, queue); else @@ -2708,18 +2770,20 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } else { + division.switchState(WorkerUdpTracker::decoding); + #ifdef SUBCHANNELS_IN_HEADER unsigned mySubchannel = header.mySubChannel(); #else Owned topology = getTopology(); unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel(); #endif - Owned packet = createSerializedRoxiePacket(mb); unsigned retries = header.thisChannelRetries(mySubchannel); if (retries >= SUBCHANNEL_MASK) return; // I already failed unrecoverably on this request - ignore it if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) { + division.switchState(WorkerUdpTracker::acknowledging); #ifdef DEBUG if (testAgentFailure & 0x1 && !retries) return; @@ -2736,6 +2800,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase if (retries) { // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread + division.switchState(WorkerUdpTracker::retrying); assertex(header.channel); // should never see a retry on channel 0 // Send back an out-of-band immediately, to let Roxie server know that channel is still active if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests) @@ -2784,7 +2849,9 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str()); } - WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing); + division.switchState(WorkerUdpTracker::creatingPacket); + Owned packet = createSerializedRoxiePacket(mb); + division.switchState(WorkerUdpTracker::pushing); #ifdef NEW_IBYTI // It's debatable whether we should delay for the primary here - they had one chance already... // But then again, so did we, assuming the timeout is longer than the IBYTIdelay @@ -2795,15 +2862,22 @@ class RoxieSocketQueueManager : public RoxieReceiverBase delay += getIbytiDelay(header.subChannels[subChannel]); } if (delay) + { + division.switchState(WorkerUdpTracker::deferring); delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay); + } else #endif + { + division.switchState(WorkerUdpTracker::pushing); queue.enqueueUnique(packet.getClear(), mySubchannel, 0); + } } } else // first time (not a retry). { - WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing); + division.switchState(WorkerUdpTracker::creatingPacket); + Owned packet = createSerializedRoxiePacket(mb); #ifdef NEW_IBYTI unsigned delay = 0; if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special @@ -2812,10 +2886,16 @@ class RoxieSocketQueueManager : public RoxieReceiverBase delay += getIbytiDelay(header.subChannels[subChannel]); } if (delay) + { + division.switchState(WorkerUdpTracker::deferring); delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay); + } else #endif + { + division.switchState(WorkerUdpTracker::pushing); queue.enqueue(packet.getClear(), 0); + } } } } @@ -2831,12 +2911,12 @@ class RoxieSocketQueueManager : public RoxieReceiverBase WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other); for (;;) { - mb.clear(); try { // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow // DO NOT put tracing on this thread except at very high tracelevels! #ifdef NEW_IBYTI + division.switchState(WorkerUdpTracker::getTimeout); unsigned timeout = delayed.timeout(msTick()); if (timeout>5000) timeout = 5000; @@ -2844,6 +2924,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase unsigned timeout = 5000; #endif division.switchState(WorkerUdpTracker::allocating); + mb.clear(); void * buffer = mb.reserve(maxPacketSize); division.switchState(WorkerUdpTracker::waiting); @@ -2904,11 +2985,18 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } } #ifdef NEW_IBYTI - delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue); + division.switchState(WorkerUdpTracker::checkingExpired); + unsigned now = msTick(); + if (now != lastCheck) + { + lastCheck = now; + delayed.checkExpired(now, slaQueue, hiQueue, loQueue); + } #endif } return 0; } + unsigned lastCheck = 0; void start() {