Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Oct 9, 2024
1 parent 4f99d1e commit 73571f6
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,12 @@ extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m)
{
unsigned length = m.length(); // don't make assumptions about evaluation order of parameters...
if (true || encryptInTransit)
return new CSerializedRoxieQueryPacket(m.detachOwn(), length);
{
// void *data = m.detachOwn();
void *data = malloc(length);
memcpy(data, m.bufferBase(), length);
return new CSerializedRoxieQueryPacket(data, length);
}
else
return new CNocryptRoxieQueryPacket(m.detachOwn(), length);
}
Expand Down Expand Up @@ -2393,7 +2398,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
DelayedPacketQueueManager delayed;
#endif

class WorkerUdpTracker : public TimeDivisionTracker<11, false>
class WorkerUdpTracker : public TimeDivisionTracker<12, false>
{
public:
enum
Expand All @@ -2408,10 +2413,11 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
decoding,
acknowledging,
retrying,
creatingPacket
creatingPacket,
getTimeout
};

WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<11, false>(name, reportIntervalSeconds)
WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<12, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
Expand All @@ -2424,6 +2430,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
stateNames[acknowledging] = "acknowledging";
stateNames[retrying] = "retrying";
stateNames[creatingPacket] = "creating packet";
stateNames[getTimeout] = "getting timeout";
}

} timeTracker;
Expand Down Expand Up @@ -2799,7 +2806,9 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
{
StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
}
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing);
division.switchState(WorkerUdpTracker::creatingPacket);
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
division.switchState(WorkerUdpTracker::pushing);
#ifdef NEW_IBYTI
// It's debatable whether we should delay for the primary here - they had one chance already...
// But then again, so did we, assuming the timeout is longer than the IBYTIdelay
Expand All @@ -2809,7 +2818,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++)
delay += getIbytiDelay(header.subChannels[subChannel]);
}
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
if (delay)
delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
else
Expand Down Expand Up @@ -2849,19 +2857,20 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other);
for (;;)
{
mb.clear();
try
{
// NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
// DO NOT put tracing on this thread except at very high tracelevels!
#ifdef NEW_IBYTI
division.switchState(WorkerUdpTracker::getTimeout);
unsigned timeout = delayed.timeout(msTick());
if (timeout>5000)
timeout = 5000;
#else
unsigned timeout = 5000;
#endif
division.switchState(WorkerUdpTracker::allocating);
mb.clear();
void * buffer = mb.reserve(maxPacketSize);

division.switchState(WorkerUdpTracker::waiting);
Expand Down Expand Up @@ -2923,11 +2932,17 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
}
#ifdef NEW_IBYTI
division.switchState(WorkerUdpTracker::checkingExpired);
delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue);
unsigned now = msTick();
if (now != lastCheck)
{
lastCheck = now;
delayed.checkExpired(now, slaQueue, hiQueue, loQueue);
}
#endif
}
return 0;
}
unsigned lastCheck = 0;

void start()
{
Expand Down

0 comments on commit 73571f6

Please sign in to comment.