Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing #19191

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft

Testing #19191

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions agentstress.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
rtl := SERVICE
unsigned4 sleep(unsigned4 _delay) : eclrtl,action,library='eclrtl',entrypoint='rtlSleep';
END;

s := 0 : STORED('s');

MyRec := RECORD
STRING5 Value1;
END;

MyRec t1(unsigned c) := transform
SELF.value1 := 'HELL'+rtl.sleep(s);
END;

ds := NOFOLD(DATASET(1, t1(COUNTER)));
a(STRING f) := allnodes(ds(Value1=f));
b(STRING f) := a(f+'1')+a(f+'2')+a(f+'3')+a(f+'4')+a(f+'5')+a(f+'6')+a(f+'7')+a(f+'8')+a(f+'9')+a(f+'10');
c := b('1')+b('2')+b('3')+b('4')+b('5')+b('6')+b('7')+b('8')+b('9')+b('10');
c;
47 changes: 35 additions & 12 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ void init_signals()
{
// signal(SIGTERM, caughtSIGTERM);
#ifndef _WIN32
#ifdef __APPLE__
signal(SIGPIPE, SIG_IGN); // Otherwise it seems semaphore gets interrupted
#else
signal(SIGPIPE, caughtSIGPIPE);
#endif
signal(SIGHUP, caughtSIGHUP);
signal(SIGALRM, caughtSIGALRM);

Expand Down Expand Up @@ -484,18 +488,23 @@ void readStaticTopology()
unsigned numNodes = topology->getCount("./RoxieServerProcess");
if (!numNodes && oneShotRoxie)
{
if (topology->getPropBool("expert/@addDummyNode", false))
unsigned dummyNodes = topology->getPropInt("expert/@addDummyNode", 0);
if (dummyNodes)
{
// Special config for testing some multinode things on a single node
numNodes = dummyNodes + 1;
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", ".");
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", "192.0.2.0"); // A non-existent machine (this address is reserved for documentation)
numNodes = 2;
for (unsigned dummyNo = 0; dummyNo < dummyNodes; dummyNo++)
{
VStringBuffer dummyIP("192.0.2.%u", dummyNo); // A non-existent machine (this address is reserved for documentation)
topology->addPropTree("RoxieServerProcess")->setProp("@netAddress", dummyIP.str());
}
localAgent = false;
topology->setPropInt("@numChannels", 2);
numChannels = 2;
topology->setPropInt("@numDataCopies", 2);
topology->setPropInt("@channelsPerNode", 2);
topology->setProp("@agentConfig", "cyclic");
topology->setPropInt("@numChannels", 1);
numChannels = 1;
topology->setPropInt("@numDataCopies", numNodes);
topology->setPropInt("@channelsPerNode", 1);
topology->setProp("@agentConfig", "simple");
}
else if (oneShotRoxie)
{
Expand All @@ -506,6 +515,8 @@ void readStaticTopology()
Owned<IPropertyTreeIterator> roxieServers = topology->getElements("./RoxieServerProcess");

bool myNodeSet = false;
StringBuffer forceIP;
topology->getProp("expert/@forceIP", forceIP);
unsigned calcNumChannels = 0;
ForEach(*roxieServers)
{
Expand All @@ -514,11 +525,23 @@ void readStaticTopology()
IpAddress ip(iptext);
if (ip.isNull())
throw MakeStringException(ROXIE_UDP_ERROR, "Could not resolve address %s", iptext);
if (ip.isLocal() && !myNodeSet)
if (forceIP.length())
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
if (streq(iptext, forceIP) && !myNodeSet)
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
}
}
else
{
if (ip.isLocal() && !myNodeSet)
{
myNodeSet = true;
myNode.setIp(ip);
myAgentEP.set(ccdMulticastPort, myNode.getIpAddress());
}
}
ForEachItemIn(idx, nodeTable)
{
Expand Down
118 changes: 103 additions & 15 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,8 +794,8 @@ class CSerializedRoxieQueryPacket : public CRoxieQueryPacketBase, implements ISe

extern IRoxieQueryPacket *createRoxiePacket(void *_data, unsigned _len)
{
if (!encryptInTransit)
return new CNocryptRoxieQueryPacket(_data, _len);
//if (!encryptInTransit)
// return new CNocryptRoxieQueryPacket(_data, _len);
if ((unsigned short)_len != _len)
{
StringBuffer s;
Expand Down Expand Up @@ -844,8 +844,13 @@ extern IRoxieQueryPacket *deserializeCallbackPacket(MemoryBuffer &m)
extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m)
{
unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
if (encryptInTransit)
return new CSerializedRoxieQueryPacket(m.detachOwn(), length);
if (true || encryptInTransit)
{
// void *data = m.detachOwn();
void *data = malloc(length);
memcpy(data, m.bufferBase(), length);
return new CSerializedRoxieQueryPacket(data, length);
}
else
return new CNocryptRoxieQueryPacket(m.detachOwn(), length);
}
Expand Down Expand Up @@ -2194,7 +2199,10 @@ class DelayedPacketQueue

void append(ISerializedRoxieQueryPacket *packet, unsigned expires)
{
// Goes on the end. But percolate the expiry time backwards
// Insert in the list at the appropriate point that the list stays in expiry order
// With a single buddy this is usually on the end, but with multiple buddies we will be inserting at various points.
// Should we consider a queue per buddy?

assert(GetCurrentThreadId()==roxiePacketReaderThread);
packet->noteQueued(0);
DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires);
Expand All @@ -2203,6 +2211,8 @@ class DelayedPacketQueue
StringBuffer s;
DBGLOG("Adding delayed packet %s expires in %u ms", packet->queryHeader().toString(s).str(), expires - msTick());
}
#if 0
// old code assumed only one buddy
newEntry->prev = tail;
if (tail)
{
Expand All @@ -2218,6 +2228,41 @@ class DelayedPacketQueue
else
head = newEntry;
tail = newEntry;
#else
if (tail)
{
DelayedPacketEntry *last = nullptr;
DelayedPacketEntry *finger = tail;
while (finger != nullptr)
{
if ((int) (finger->waitExpires - expires) <= 0)
break;
last = finger;
finger = finger->prev;
}
if (last)
last->prev = newEntry;
else
tail = newEntry;
newEntry->next = last;
newEntry->prev = finger;
if (finger)
finger->next = newEntry;
else
head = newEntry;
}
else
{
head = newEntry;
tail = newEntry;
}
#endif
numEntries++;
if (numEntries > maxNumEntries)
{
maxNumEntries = numEntries;
DBGLOG("WorkerUdpReader: Max IBYTI queue length is now %u", numEntries);
}
}

// Move any that we are done waiting for our buddy onto the active queue
Expand Down Expand Up @@ -2283,11 +2328,13 @@ class DelayedPacketQueue
if (goer==tail)
tail = goer->prev;
delete goer;
numEntries--;
}

DelayedPacketEntry *head = nullptr;
DelayedPacketEntry *tail = nullptr;

unsigned numEntries = 0;
unsigned maxNumEntries = 0;
};

//------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -2393,7 +2440,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
DelayedPacketQueueManager delayed;
#endif

class WorkerUdpTracker : public TimeDivisionTracker<6, false>
class WorkerUdpTracker : public TimeDivisionTracker<13, false>
{
public:
enum
Expand All @@ -2403,17 +2450,31 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
allocating,
processing,
pushing,
checkingRunning
deferring,
checkingRunning,
checkingExpired,
decoding,
acknowledging,
retrying,
creatingPacket,
getTimeout
};

WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds)
WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<13, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
stateNames[processing] = "processing";
stateNames[pushing] = "pushing";
stateNames[deferring] = "deferring";
stateNames[checkingRunning] = "checking running";
stateNames[checkingExpired] = "checking expiry";
stateNames[decoding] = "decoding";
stateNames[acknowledging] = "acknowledging";
stateNames[retrying] = "retrying";
stateNames[creatingPacket] = "creating packet";
stateNames[getTimeout] = "getting timeout";
}

} timeTracker;
Expand All @@ -2437,7 +2498,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
} readThread;

public:
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 60), readThread(*this)
RoxieSocketQueueManager(unsigned _numWorkers) : RoxieReceiverBase(_numWorkers), logctx("RoxieSocketQueueManager"), timeTracker("WorkerUdpReader", 5), readThread(*this)
{
maxPacketSize = multicastSocket->get_max_send_size();
if ((maxPacketSize==0)||(maxPacketSize>65535))
Expand Down Expand Up @@ -2666,6 +2727,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
{
// NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
// DO NOT put tracing on this thread except at very high tracelevels!
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::processing);
if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
doIbyti(header, queue);
else
Expand Down Expand Up @@ -2708,18 +2770,20 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
else
{
division.switchState(WorkerUdpTracker::decoding);

#ifdef SUBCHANNELS_IN_HEADER
unsigned mySubchannel = header.mySubChannel();
#else
Owned<const ITopologyServer> topology = getTopology();
unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
#endif
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
unsigned retries = header.thisChannelRetries(mySubchannel);
if (retries >= SUBCHANNEL_MASK)
return; // I already failed unrecoverably on this request - ignore it
if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST)
{
division.switchState(WorkerUdpTracker::acknowledging);
#ifdef DEBUG
if (testAgentFailure & 0x1 && !retries)
return;
Expand All @@ -2736,6 +2800,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
if (retries)
{
// MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
division.switchState(WorkerUdpTracker::retrying);
assertex(header.channel); // should never see a retry on channel 0
// Send back an out-of-band immediately, to let Roxie server know that channel is still active
if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests)
Expand Down Expand Up @@ -2784,7 +2849,9 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
{
StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
}
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing);
division.switchState(WorkerUdpTracker::creatingPacket);
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
division.switchState(WorkerUdpTracker::pushing);
#ifdef NEW_IBYTI
// It's debatable whether we should delay for the primary here - they had one chance already...
// But then again, so did we, assuming the timeout is longer than the IBYTIdelay
Expand All @@ -2795,15 +2862,22 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
delay += getIbytiDelay(header.subChannels[subChannel]);
}
if (delay)
{
division.switchState(WorkerUdpTracker::deferring);
delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
}
else
#endif
{
division.switchState(WorkerUdpTracker::pushing);
queue.enqueueUnique(packet.getClear(), mySubchannel, 0);
}
}
}
else // first time (not a retry).
{
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing);
division.switchState(WorkerUdpTracker::creatingPacket);
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
#ifdef NEW_IBYTI
unsigned delay = 0;
if (mySubchannel != 0 && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) // i.e. I am not the primary here, and never delay special
Expand All @@ -2812,10 +2886,16 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
delay += getIbytiDelay(header.subChannels[subChannel]);
}
if (delay)
{
division.switchState(WorkerUdpTracker::deferring);
delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
}
else
#endif
{
division.switchState(WorkerUdpTracker::pushing);
queue.enqueue(packet.getClear(), 0);
}
}
}
}
Expand All @@ -2831,19 +2911,20 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other);
for (;;)
{
mb.clear();
try
{
// NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
// DO NOT put tracing on this thread except at very high tracelevels!
#ifdef NEW_IBYTI
division.switchState(WorkerUdpTracker::getTimeout);
unsigned timeout = delayed.timeout(msTick());
if (timeout>5000)
timeout = 5000;
#else
unsigned timeout = 5000;
#endif
division.switchState(WorkerUdpTracker::allocating);
mb.clear();
void * buffer = mb.reserve(maxPacketSize);

division.switchState(WorkerUdpTracker::waiting);
Expand Down Expand Up @@ -2904,11 +2985,18 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
}
#ifdef NEW_IBYTI
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
division.switchState(WorkerUdpTracker::checkingExpired);
unsigned now = msTick();
if (now != lastCheck)
{
lastCheck = now;
delayed.checkExpired(now, slaQueue, hiQueue, loQueue);
}
#endif
}
return 0;
}
unsigned lastCheck = 0;

void start()
{
Expand Down
Loading