Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make G__FastAllocString cache thread safe #34

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion cint/cint/src/FastAllocString.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "stdio.h"
#include "string.h"
#include <map>
#if __cplusplus >= 201103L
#include <atomic>
#endif

namespace Cint {
namespace Internal {
Expand Down Expand Up @@ -52,6 +55,97 @@ namespace Cint {
// 8: 0
class G__BufferReservoir {
public:
#if __cplusplus >= 201103L
//Thread safe circular buffer
// elements within the buffer (fBuffers) are either a nullptr or a valid Buffer_t
// fNumAvailableBuffers is an approximate count of the number of valid Buffer_t.
// It is only an approximate count since during a push or a pop the value can
// temporarily be off. However, given sufficient time after all push or pops have stopped,
// the value of fNumAvailableBuffers will be correct. fNumAvailableBuffers can be negative
// if push happens at the same time as a pop. If the push adds a valid Buffer_t to the buffer
// but then the pop pulls that value out and decrements fNumAvailableBuffers before the push
// has a chance to increment it. Similarly fNumAvailableBuffers can temporarily exceed fNumBuffers.
// Similarly, fWatermark is only a good place to start searching for either an invalid or valid Buffer_t
// since a push or pop operation may not have had a chance to move the value before the next
// call is made. Given the imperfect nature of fWatermark, the use of a circular buffer allows
// for a better chance to find an availble element during a concurrenct call.
class Bucket {
public:
Bucket():
fBuffers(0), fWatermark(0), fNumBuffers(0), fNumAvailableBuffers(0)
{}
~Bucket() {
// delete all buffers
for(auto it = fBuffers; it != fBuffers+fNumBuffers; ++it) {
delete [] *it;
}
delete [] fBuffers;
}

void init(size_t numBuffers) {
fNumBuffers = numBuffers;
fBuffers = new std::atomic<Buffer_t>[numBuffers];
bzero(fBuffers,sizeof(Buffer_t)*numBuffers);
fWatermark.store( fBuffers + numBuffers);
}

bool push(char* buf) {
if(fNumAvailableBuffers >= static_cast<long>(fNumBuffers)) {
return false;
}
std::atomic<Buffer_t>* temp = fWatermark.load();
std::atomic<Buffer_t>* startValue = temp;
std::atomic<Buffer_t>* loopEndCondition = fBuffers;
for(int mode =0; mode != 2; ++mode) {
while(temp != loopEndCondition) {
--temp;
Buffer_t previous = nullptr;
if(temp->compare_exchange_strong(previous,buf)) {
++fNumAvailableBuffers;
fWatermark.compare_exchange_strong(startValue,temp);
return true;
}
}
//Didn't find it so now go back around
temp = fBuffers+fNumBuffers;
loopEndCondition = startValue;
}
return false;
}

char* pop() {
if(fNumAvailableBuffers <= 0) {
return nullptr;
}
std::atomic<Buffer_t>* temp = fWatermark.load();
std::atomic<Buffer_t>* startValue = temp;
std::atomic<Buffer_t>* loopEndCondition = fBuffers+fNumBuffers;
for(int mode = 0; mode !=2; ++mode) {
while(temp < loopEndCondition) {
Buffer_t retValue = temp->exchange(nullptr);
if(nullptr != retValue) {
--fNumAvailableBuffers;
fWatermark.compare_exchange_strong(startValue,temp+1);
return retValue;
}
++temp;
}
//Didn't find it so now go back around
temp = fBuffers;
loopEndCondition = startValue;
}
return 0;
}
private:
typedef char* Buffer_t;

std::atomic<Buffer_t>* fBuffers; // array of buffers,
std::atomic<std::atomic<Buffer_t>*> fWatermark; // most recently filled slot
size_t fNumBuffers; // size of fBuffers
std::atomic<long> fNumAvailableBuffers; //number of non null buffers
};

#else
class Bucket {
public:
Bucket():
Expand Down Expand Up @@ -92,7 +186,7 @@ namespace Cint {
Buffer_t* fWatermark; // most recently filled slot
size_t fNumBuffers; // size of fBuffers
};

#endif
private:
G__BufferReservoir() {
static size_t numBuffers[fgNumBuckets] = {256, 64, 16, 8, 4, 4, 2};
Expand Down