Skip to content

Commit

Permalink
Multithreaded warper: make sure a transformer object is used by the t…
Browse files Browse the repository at this point in the history
…hread which created it (fixes OSGeo#1989). This workarounds a PROJ bug also fixed per OSGeo/PROJ#1726
  • Loading branch information
rouault committed Nov 12, 2019
1 parent dacda90 commit 83c2506
Showing 1 changed file with 77 additions and 42 deletions.
119 changes: 77 additions & 42 deletions gdal/alg/gdalwarpkernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,28 @@ struct _GWKJobStruct
int (*pfnProgress)(GWKJobStruct* psJob);
void *pTransformerArg;

// Just used during thread initialization phase.
GDALTransformerFunc pfnTransformerInit;
void *pTransformerArgInit;
void (*pfnFunc)(void*); // used by GWKRun() to assign the proper pTransformerArg
} ;

struct GWKThreadInitData
{
GDALTransformerFunc pfnTransformerInit = nullptr;
void *pTransformerArgInit = nullptr;

void *pTransformerArg = nullptr;
GIntBig nThreadId = 0;
};

struct GWKThreadData
{
CPLWorkerThreadPool* poThreadPool = nullptr;
GWKJobStruct* pasThreadJob = nullptr;
CPLCond* hCond = nullptr;
CPLMutex* hCondMutex = nullptr;
void* pTransformerArgInput = nullptr; // owned by calling layer. Not to be destroyed
std::map<GIntBig, void*> mapThreadToTransformerArg{};
};

/************************************************************************/
/* GWKProgressThread() */
/************************************************************************/
Expand Down Expand Up @@ -277,11 +294,11 @@ static CPLErr GWKGenericMonoThread( GDALWarpKernel *poWK,

static void GWKThreadInitTransformer(void* pData)
{
GWKJobStruct* psJob = static_cast<GWKJobStruct*>(pData);
if( psJob->pTransformerArg == nullptr )
psJob->pTransformerArg =
GDALCloneTransformer(psJob->pTransformerArgInit);
if( psJob->pTransformerArg != nullptr )
GWKThreadInitData* psInitData = static_cast<GWKThreadInitData*>(pData);
if( psInitData->pTransformerArg == nullptr )
psInitData->pTransformerArg =
GDALCloneTransformer(psInitData->pTransformerArgInit);
if( psInitData->pTransformerArg != nullptr )
{
// In case of lazy opening (for example RPCDEM), do a dummy
// transformation to be sure that the DEM is really opened with the
Expand All @@ -291,24 +308,17 @@ static void GWKThreadInitTransformer(void* pData)
double dfZ = 0.0;
int bSuccess = FALSE;
CPLPushErrorHandler(CPLQuietErrorHandler);
psJob->pfnTransformerInit(psJob->pTransformerArg, TRUE, 1,
psInitData->pfnTransformerInit(psInitData->pTransformerArg, TRUE, 1,
&dfX, &dfY, &dfZ, &bSuccess );
CPLPopErrorHandler();
}
psInitData->nThreadId = CPLGetPID();
}

/************************************************************************/
/* GWKThreadsCreate() */
/************************************************************************/

typedef struct
{
CPLWorkerThreadPool* poThreadPool;
GWKJobStruct* pasThreadJob;
CPLCond* hCond;
CPLMutex* hCondMutex;
} GWKThreadData;

void* GWKThreadsCreate( char** papszWarpOptions,
GDALTransformerFunc pfnTransformer,
void* pTransformerArg )
Expand All @@ -328,11 +338,7 @@ void* GWKThreadsCreate( char** papszWarpOptions,
if( nThreads > 128 )
nThreads = 128;

GWKThreadData* psThreadData = static_cast<GWKThreadData *>(
VSI_CALLOC_VERBOSE(1, sizeof(GWKThreadData)));
if( psThreadData == nullptr )
return nullptr;

GWKThreadData* psThreadData = new GWKThreadData();
CPLCond* hCond = nullptr;
if( nThreads )
hCond = CPLCreateCond();
Expand Down Expand Up @@ -360,18 +366,22 @@ void* GWKThreadsCreate( char** papszWarpOptions,
}
CPLReleaseMutex(psThreadData->hCondMutex);

std::vector<std::unique_ptr<GWKThreadInitData>> apoInitData;
std::vector<void*> apInitData;
for( int i = 0; i < nThreads; i++ )
{
psThreadData->pasThreadJob[i].hCond = psThreadData->hCond;
psThreadData->pasThreadJob[i].hCondMutex = psThreadData->hCondMutex;
psThreadData->pasThreadJob[i].pfnTransformerInit = pfnTransformer;
psThreadData->pasThreadJob[i].pTransformerArgInit = pTransformerArg;

std::unique_ptr<GWKThreadInitData> poInitData(new GWKThreadInitData());
poInitData->pfnTransformerInit = pfnTransformer;
poInitData->pTransformerArgInit = pTransformerArg;
if( i == 0 )
psThreadData->pasThreadJob[i].pTransformerArg = pTransformerArg;
poInitData->pTransformerArg = pTransformerArg;
else
psThreadData->pasThreadJob[i].pTransformerArg = nullptr;
apInitData.push_back(&(psThreadData->pasThreadJob[i]));
poInitData->pTransformerArg = nullptr;
apoInitData.push_back(std::move(poInitData));
apInitData.push_back(apoInitData.back().get());
}

psThreadData->poThreadPool = new (std::nothrow) CPLWorkerThreadPool();
Expand All @@ -386,21 +396,32 @@ void* GWKThreadsCreate( char** papszWarpOptions,

for( int i = 1; i < nThreads; i++ )
{
if( psThreadData->pasThreadJob[i].pTransformerArg == nullptr )
if( apoInitData[i]->pTransformerArg == nullptr )
{
CPLDebug("WARP", "Cannot deserialize transformer");
bTransformerCloningSuccess = false;
break;
}
}

if( !bTransformerCloningSuccess )
if( bTransformerCloningSuccess )
{
psThreadData->pTransformerArgInput = pTransformerArg;
for( int i = 0; i < nThreads; i++ )
{
const auto nThreadId = apoInitData[i]->nThreadId;
CPLAssert(psThreadData->mapThreadToTransformerArg.find(nThreadId) ==
psThreadData->mapThreadToTransformerArg.end());
psThreadData->mapThreadToTransformerArg[nThreadId] =
apoInitData[i]->pTransformerArg;
}
}
else
{
for( int i = 1; i < nThreads; i++ )
{
if( psThreadData->pasThreadJob[i].pTransformerArg )
GDALDestroyTransformer(psThreadData->
pasThreadJob[i].pTransformerArg);
if( apoInitData[i]->pTransformerArg )
GDALDestroyTransformer(apoInitData[i]->pTransformerArg);
}
CPLFree(psThreadData->pasThreadJob);
psThreadData->pasThreadJob = nullptr;
Expand All @@ -427,15 +448,10 @@ void GWKThreadsEnd( void* psThreadDataIn )
GWKThreadData* psThreadData = static_cast<GWKThreadData *>(psThreadDataIn);
if( psThreadData->poThreadPool )
{
const int nThreads = psThreadData->poThreadPool->GetThreadCount();
if( psThreadData->pasThreadJob )
for( auto& pair: psThreadData->mapThreadToTransformerArg )
{
for( int i = 1; i < nThreads; i++ )
{
if( psThreadData->pasThreadJob[i].pTransformerArg )
GDALDestroyTransformer(psThreadData->
pasThreadJob[i].pTransformerArg);
}
if( pair.second != psThreadData->pTransformerArgInput )
GDALDestroyTransformer(pair.second);
}
delete psThreadData->poThreadPool;
}
Expand All @@ -444,9 +460,27 @@ void GWKThreadsEnd( void* psThreadDataIn )
CPLDestroyCond(psThreadData->hCond);
if( psThreadData->hCondMutex )
CPLDestroyMutex(psThreadData->hCondMutex);
CPLFree(psThreadData);
delete psThreadData;
}

/************************************************************************/
/* ThreadFuncAdapter() */
/************************************************************************/

static void ThreadFuncAdapter(void* pData)
{
// Assign the pTransformerArg created in the current thread to this job
// This workarounds the PROJ bug fixed in https://github.com/OSGeo/PROJ/pull/1726
GWKJobStruct* psJob = static_cast<GWKJobStruct *>(pData);
const GWKThreadData* psThreadData =
static_cast<const GWKThreadData*>(psJob->poWK->psThreadData);
auto oIter = psThreadData->mapThreadToTransformerArg.find(CPLGetPID());
CPLAssert(oIter != psThreadData->mapThreadToTransformerArg.end());
psJob->pTransformerArg = oIter->second;
psJob->pfnFunc(pData);
}


/************************************************************************/
/* GWKRun() */
/************************************************************************/
Expand Down Expand Up @@ -519,7 +553,8 @@ static CPLErr GWKRun( GDALWarpKernel *poWK,
psThreadData->pasThreadJob[i].pfnProgress = GWKProgressThread;
else
psThreadData->pasThreadJob[i].pfnProgress = nullptr;
psThreadData->poThreadPool->SubmitJob( pfnFunc,
psThreadData->pasThreadJob[i].pfnFunc = pfnFunc;
psThreadData->poThreadPool->SubmitJob( ThreadFuncAdapter,
static_cast<void*>(&psThreadData->pasThreadJob[i]) );
}

Expand Down

0 comments on commit 83c2506

Please sign in to comment.