Skip to content

Commit

Permalink
Now executes and runs initial samples in the parallel for loop. The m…
Browse files Browse the repository at this point in the history
…odel is

now responsible for doling out work.
  • Loading branch information
rick.weber.iii committed Feb 21, 2012
1 parent e0e1a02 commit 86b1c49
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 17 deletions.
24 changes: 22 additions & 2 deletions Include/clUtilParallelFor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <math.h>
#include "clUtilDeviceGroup.h"
#include "clUtilDevice.h"
#include "clUtilUtility.h"

namespace clUtil
{
Expand All @@ -21,6 +22,20 @@ namespace clUtil
}
};

struct CompletedTask
{
double Time;
size_t SampleNumber;
size_t StartIndex;
size_t EndIndex;
};

struct PendingTask
{
size_t StartIndex;
size_t EndIndex;
};

class ParallelForPerformanceModel
{
friend void ParallelFor(size_t start,
Expand All @@ -30,11 +45,16 @@ namespace clUtil
unsigned int numSamples);

private:
std::unique_ptr<unsigned int[]> mCurrentSample;
std::vector<Utility::UnsafeQueue<PendingTask>> mPendingSampleQueues;
size_t mNumSamples;
size_t mStart;
size_t mEnd;

ParallelForPerformanceModel(size_t numSamples);
ParallelForPerformanceModel(size_t numSamples,
size_t start,
size_t end);

PendingTask getWork(size_t deviceGroup);
public:
};

Expand Down
51 changes: 51 additions & 0 deletions Include/clUtilUtility.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,56 @@ namespace clUtil

return CL_SUCCESS;
}

double getTime();

template <typename T> class UnsafeQueue
{
private:
volatile size_t mFront;
volatile size_t mBack;
size_t mSize;
size_t mMask;
std::vector<T> mCyclicQueue;

public:
UnsafeQueue(size_t pow2Size) :
mFront(0),
mBack(0),
mSize(1 << pow2Size),
mMask((0x1 << pow2Size) - 1),
mCyclicQueue(mSize)
{
if(pow2Size > 63)
{
throw clUtilException("Internal error: queues cannot be longer than"
"2^63 elements. Report this as a bug.");
}
}

void push(T item)
{
mCyclicQueue[mBack & mMask] = item;
mBack++;
}

bool pop(T& item)
{
if(mFront >= mBack)
{
return false;
}

item = mCyclicQueue[mFront & mMask];
mFront++;

return true;
}

size_t length()
{
return mBack - mFront;
}
};
}
}
1 change: 0 additions & 1 deletion Makefile.inc
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
CXX=g++-4.5
#CXX=/nfs/sw/gcc/4.5.2/bin/g++
USERFLAGS=-g3 -O0 -Wall
OpenCLInclude=$(AMDAPPSDKROOT)/include
175 changes: 164 additions & 11 deletions clUtilParallelFor.cc
Original file line number Diff line number Diff line change
@@ -1,28 +1,96 @@
#include "clUtilParallelFor.h"

#define STRINGIFY(arg) #arg
#define __WHERE__ __FILE__ ":" STRINGIFY(__LINE__)

using namespace std;
using namespace clUtil;
using namespace clUtil::Utility;

bool DeviceGroupInfo::singletonInitialized = false;
DeviceGroupInfo DeviceGroupInfo::deviceGroupInfoSingleton;

ParallelForPerformanceModel::ParallelForPerformanceModel(size_t numSamples) :
mNumSamples(numSamples)
static const double kModelFraction = 0.1;
static const size_t kMaxQueueLengthPow2 = 8; //2^8 = 256

ParallelForPerformanceModel::ParallelForPerformanceModel(size_t numSamples,
size_t start,
size_t end) :
mPendingSampleQueues(DeviceGroupInfo::Get().numGroups(),
UnsafeQueue<PendingTask>(kMaxQueueLengthPow2)),
mNumSamples(numSamples),
mStart(start),
mEnd(end)
{
size_t numDeviceGroups = DeviceGroupInfo::Get().numGroups();

unique_ptr<unsigned int[]>
currentSample(new unsigned int[numDeviceGroups]);
//Partition the initial sample workload to different device groups
size_t sampleIterationCount = kModelFraction * (end - start) / numSamples;

//Initialize the current sample array
for(unsigned int curDeviceGroup = 0;
curDeviceGroup < numDeviceGroups;
curDeviceGroup++)
for(size_t curSample = 0; curSample < numSamples; curSample++)
{
currentSample[curDeviceGroup] = 0u;
size_t iterationOffset = 0;

for(size_t curDeviceGroup = 0;
curDeviceGroup < numDeviceGroups;
curDeviceGroup++)
{
PendingTask newTask;

if(iterationOffset == 0 && curSample < numSamples - 1)
{
iterationOffset =
mStart + curSample * (mEnd + mEnd / numSamples)
/ numSamples + iterationOffset;
}
else if(iterationOffset == 0 && curSample == numSamples - 1)
{
iterationOffset = mEnd - sampleIterationCount;
}

newTask.StartIndex = iterationOffset;

//Last device group on the last sample only goes til last iteration.
//Others take their iterations as usual
if(curSample == numSamples - 1 && curDeviceGroup == numDeviceGroups - 1)
{
newTask.EndIndex = end - 1;
}
else
{
newTask.EndIndex =
iterationOffset + sampleIterationCount / numDeviceGroups;
}

iterationOffset = newTask.EndIndex + 1;

mPendingSampleQueues[curDeviceGroup].push(newTask);

#if 0
cout << "DeviceGroup: " << curDeviceGroup
<< " Start: " << newTask.StartIndex
<< " End : " << newTask.EndIndex
<< endl;
#endif
}
}
}

mCurrentSample = move(currentSample);
PendingTask ParallelForPerformanceModel::getWork(size_t deviceGroup)
{
PendingTask work;

//Pull from the sample queues first.
if(mPendingSampleQueues[deviceGroup].length() > 0)
{
mPendingSampleQueues[deviceGroup].pop(work);

return work;
}
else //If empty, use model to get work
{
return work;
}
}

void clUtil::ParallelFor(size_t start,
Expand All @@ -31,5 +99,90 @@ void clUtil::ParallelFor(size_t start,
void (*loopBody)(size_t start, size_t end),
unsigned int numSamples)
{
ParallelForPerformanceModel model(numSamples);
struct DeviceStatus
{
double Time1;
double Time2;
size_t StartIndex;
size_t EndIndex;
cl_event WaitEvent;
bool IsBusy;

DeviceStatus() :
Time1(0.0),
Time2(0.0),
StartIndex(0),
EndIndex(0),
WaitEvent(NULL),
IsBusy(false)
{
}
};

size_t oldDeviceNum = Device::GetCurrentDeviceNum();
ParallelForPerformanceModel model(numSamples, start, end);
size_t iterationsRemaining = end - start;
vector<DeviceStatus> deviceStatuses(Device::GetDevices().size());

//Parallel for scheduling loop
while(iterationsRemaining > 0)
{
for(size_t curDevice = 0;
curDevice < Device::GetDevices().size();
curDevice++)
{
DeviceStatus& curDeviceStatus = deviceStatuses[curDevice];

//If this device isn't busy, get some work from the model and run it
if(curDeviceStatus.IsBusy == false)
{
size_t deviceGroup = DeviceGroupInfo::Get()[curDevice];

PendingTask work;

work = model.getWork(deviceGroup);

Device::SetCurrentDevice(curDevice);

curDeviceStatus.StartIndex = work.StartIndex;
curDeviceStatus.EndIndex = work.EndIndex;
curDeviceStatus.Time1 = getTime();
curDeviceStatus.IsBusy = true;

loopBody(work.StartIndex, work.EndIndex);

clEnqueueMarker(Device::GetCurrentDevice().getCommandQueue(),
&curDeviceStatus.WaitEvent);

clFlush(Device::GetCurrentDevice().getCommandQueue());
}

if(curDeviceStatus.WaitEvent != NULL)
{
cl_int eventStatus;

cl_int err = clGetEventInfo(curDeviceStatus.WaitEvent,
CL_EVENT_COMMAND_EXECUTION_STATUS,
sizeof(eventStatus),
&eventStatus,
NULL);
if(err != CL_SUCCESS)
{
throw clUtilException("ParallelFor internal error: could not get "
"event info " __WHERE__ "\n");
}

if(eventStatus == CL_COMPLETE)
{
curDeviceStatus.Time2 = getTime();
curDeviceStatus.IsBusy = false;

clReleaseEvent(curDeviceStatus.WaitEvent);
curDeviceStatus.WaitEvent = NULL;
}
}
}
}

Device::SetCurrentDevice(oldDeviceNum);
}
8 changes: 8 additions & 0 deletions clUtilUtility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,13 @@ namespace clUtil

return CL_SUCCESS;
}

double getTime()
{
struct timeval time;

gettimeofday(&time, NULL);
return (double)time.tv_sec + (double)time.tv_usec / 1e6;
}
}
}
2 changes: 1 addition & 1 deletion examples/DeviceInfo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ BIN=DeviceInfo
#Directory containing OpenCL header files
INCLUDE=-I../../Include -I$(OpenCLInclude)

LIB=-lOpenCL -L../.. -lclUtil
LIB=-L../.. -lclUtil -lOpenCL
CPPFLAGS=-std=c++0x $(INCLUDE) $(USERFLAGS)

$(BIN): $(OBJ)
Expand Down
9 changes: 7 additions & 2 deletions examples/ParallelFor/ParallelFor.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#include <clUtil.h>
#include <iostream>

using namespace clUtil;
using namespace std;

int main(int argc, char** argv)
{
const char* filename[] = {"kernel.cl"};

Device::FetchDevices();
Device::InitializeDevices(filename, 1);

ParallelFor(0, 1, 10000, [=](size_t startIdx, size_t endIdx)
{

cout << "device: " << Device::GetCurrentDeviceNum()
<< " start: " << startIdx
<< " end: " << endIdx << endl;
}, 4);


Expand Down
3 changes: 3 additions & 0 deletions examples/ParallelFor/kernel.cl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__kernel void doNothing(__global void* ptr)
{
}

0 comments on commit 86b1c49

Please sign in to comment.