Skip to content

Commit

Permalink
THDFSFile fixes (#94)
Browse files Browse the repository at this point in the history
* Replaces positional read with seek+read to make THDFSFile
compatible with libhdfs3 (native HDFS client library) which only
implements the latter.

* Fixes GetUserInfo call to make it actually return current username
instead of root

* use hdfsBuilder to create HDFS connection

* makes HDFS URLs absolute instead of relative to user directory to match Hadoop conventions.
fixes HDFS path handling in various methods to make them actually work.

* enables CMake build for THDFSFile

* makes sure SysRead() returns as many bytes as requested if possible
by calling hdfsRead several tyimes.
  • Loading branch information
evgeny-boger authored and vgvassilev committed Jun 21, 2017
1 parent e0b1b4e commit 45f61ce
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 39 deletions.
2 changes: 2 additions & 0 deletions io/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

include_directories(${HDFS_INCLUDE_DIRS})

add_definitions(-D_FILE_OFFSET_BITS=64)

ROOT_GENERATE_DICTIONARY(G__HDFS *.h MODULE HDFS LINKDEF LinkDef.h)

ROOT_LINKER_LIBRARY(HDFS *.cxx G__HDFS.cxx LIBRARIES Core ${HDFS_LIBRARIES} DEPENDENCIES Net RIO)
Expand Down
3 changes: 2 additions & 1 deletion io/hdfs/inc/THDFSFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class THDFSFile : public TFile {
void *fFS; ///< HDFS user handle
Long64_t fSize; ///< File size
Long64_t fSysOffset; ///< Seek offset in file
char *fPath; ///< HDFS path name
TUrl fUrl; ///< HDFS url
TString fPath; ///< HDFS path

Int_t SysOpen(const char *pathname, Int_t flags, UInt_t mode);
Int_t SysClose(Int_t fd);
Expand Down
114 changes: 76 additions & 38 deletions io/hdfs/src/THDFSFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ This is usually found in either:
or
$JAVA_HOME/jre/lib/amd64/server
This file can only be used if hdfs support is compiled into ROOT.
The HDFS URLs should be of the form:
hdfs:///path/to/file/in/HDFS.root
The HDFS URLs follow the Hadoop notation and should be of the form:
hdfs://[host:port]/absolute/path/to/file/in/HDFS.root
Any host or port information will be ignored; this is taken from the
node's HDFS configuration files.
Example HDFS URLs:
hdfs:///user/username/dir1/file2.root
hdfs://localhost/user/username/dir1/file2.root
*/

#include "syslog.h"
Expand All @@ -53,6 +58,9 @@ node's HDFS configuration files.
// For now, we don't allow any write/fs modification operations.
static const Bool_t R__HDFS_ALLOW_CHANGES = kFALSE;

static const char hdfs_default_host[] = "default";
static const int hdfs_default_port = 0;

// The following snippet is used for developer-level debugging
// Contributed by Pete Wyckoff of the HDFS project
#define THDFSFile_TRACE
Expand All @@ -75,7 +83,6 @@ THDFSFile::THDFSFile(const char *path, Option_t *option,
fHdfsFH = 0;
fFS = 0;
fSize = -1;
fPath = 0;
fSysOffset = 0;

fOption = option;
Expand All @@ -91,15 +98,23 @@ THDFSFile::THDFSFile(const char *path, Option_t *option,

Bool_t has_authn = kTRUE;

struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld) {
SysError("THDFSFile", "Error creating hdfs builder");
goto zombie;
}

hdfsBuilderSetNameNode(bld, hdfs_default_host);
hdfsBuilderSetNameNodePort(bld, hdfs_default_port);
if (has_authn) {
UserGroup_t *ugi = gSystem->GetUserInfo(0);
UserGroup_t *ugi = gSystem->GetUserInfo((char *)0);
const char *user = (ugi->fUser).Data();
fFS = hdfsConnectAsUser("default", 0, user);
hdfsBuilderSetUserName(bld, user);
delete ugi;
} else {
fFS = hdfsConnect("default", 0);
}

fFS = hdfsBuilderConnect(bld);

if (fFS == 0) {
SysError("THDFSFile", "HDFS client for %s cannot open the filesystem",
path);
Expand Down Expand Up @@ -150,9 +165,6 @@ THDFSFile::~THDFSFile()
{
TRACE("destroy")

if (fPath)
delete [] fPath;

// We assume that the file is closed in SysClose
// Explicitly release reference to HDFS filesystem object.
// Turned off now due to compilation issues.
Expand All @@ -167,12 +179,21 @@ THDFSFile::~THDFSFile()
Int_t THDFSFile::SysRead(Int_t, void *buf, Int_t len)
{
TRACE("READ")
tSize num_read = hdfsPread((hdfsFS)fFS, (hdfsFile)fHdfsFH, fSysOffset, buf, len);
fSysOffset += len;
if (num_read < 0) {
gSystem->SetErrorStr(strerror(errno));
}
return num_read;
tSize num_read_total = 0;

do {
tSize num_read = hdfsRead((hdfsFS)fFS, (hdfsFile)fHdfsFH, (char *)buf + num_read_total, len - num_read_total);
num_read_total += num_read;
if (num_read < 0) {
gSystem->SetErrorStr(strerror(errno));
break;
} else if (num_read == 0) {
break;
}
} while (num_read_total < len);

fSysOffset += num_read_total;
return num_read_total;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -206,6 +227,12 @@ Long64_t THDFSFile::SysSeek(Int_t, Long64_t offset, Int_t whence)
SysError("THDFSFile", "Unknown whence!");
return -1;
}

if (hdfsSeek((hdfsFS)fFS, (hdfsFile)fHdfsFH, fSysOffset) != 0) {
SysError("THDFSFile", "Unable to seek to the given position");
return -1;
}

return fSysOffset;
}

Expand All @@ -214,16 +241,15 @@ Long64_t THDFSFile::SysSeek(Int_t, Long64_t offset, Int_t whence)

Int_t THDFSFile::SysOpen(const char * pathname, Int_t flags, UInt_t)
{
// This is given to us as a URL (hdfs://hadoop-name:9000//foo or
// hdfs:///foo); convert this to a file name.
TUrl url(pathname);
const char * file = url.GetFile();
size_t path_size = strlen(file);
fPath = new char[path_size+1];
if (fPath == 0) {
SysError("THDFSFile", "Unable to allocate memory for path.");
// This is given to us as a URL in Hadoop notation (hdfs://hadoop-name:9000/user/foo/bar or
// hdfs:///user/foo/bar); convert this to a file name.
fUrl = TUrl(pathname);

fPath = fUrl.GetFileAndOptions();
if (!fPath.BeginsWith("/")) {
fPath.Insert(0, '/');
}
strlcpy(fPath, file,path_size+1);

if ((fHdfsFH = hdfsOpenFile((hdfsFS)fFS, fPath, flags, 0, 0, 0)) == 0) {
SysError("THDFSFile", "Unable to open file %s in HDFS", pathname);
return -1;
Expand Down Expand Up @@ -311,15 +337,23 @@ THDFSSystem::THDFSSystem() : TSystem("-hdfs", "HDFS Helper System")

Bool_t has_authn = kTRUE;

struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld) {
SysError("THDFSSystem", "Error creating hdfs builder");
goto zombie;
}

hdfsBuilderSetNameNode(bld, hdfs_default_host);
hdfsBuilderSetNameNodePort(bld, hdfs_default_port);
if (has_authn) {
UserGroup_t *ugi = gSystem->GetUserInfo(0);
UserGroup_t *ugi = gSystem->GetUserInfo((char *)0);
const char *user = (ugi->fUser).Data();
fFH = hdfsConnectAsUser("default", 0, user);
hdfsBuilderSetUserName(bld, user);
delete ugi;
} else {
fFH = hdfsConnect("default", 0);
}

fFH = hdfsBuilderConnect(bld);

if (fFH == 0) {
SysError("THDFSSystem", "HDFS client cannot open the filesystem");
goto zombie;
Expand All @@ -345,9 +379,10 @@ Int_t THDFSSystem::MakeDirectory(const char * path)
Error("MakeDirectory", "No filesystem handle (should never happen)");
return -1;
}
TUrl url(path);

if (R__HDFS_ALLOW_CHANGES == kTRUE) {
return hdfsCreateDirectory((hdfsFS)fFH, path);
return hdfsCreateDirectory((hdfsFS)fFH, url.GetFileAndOptions());
} else {
return -1;
}
Expand All @@ -364,7 +399,7 @@ void *THDFSSystem::OpenDirectory(const char * path)
Error("OpenDirectory", "No filesystem handle (should never happen)");
return 0;
}

TUrl url(path);
fDirp = 0;
/*
if (fDirp) {
Expand All @@ -374,14 +409,14 @@ void *THDFSSystem::OpenDirectory(const char * path)
*/

hdfsFileInfo * dir = 0;
if ((dir = hdfsGetPathInfo((hdfsFS)fFH, path)) == 0) {
if ((dir = hdfsGetPathInfo((hdfsFS)fFH, url.GetFileAndOptions())) == 0) {
return 0;
}
if (dir->mKind != kObjectKindDirectory) {
return 0;
}

fDirp = (void *)hdfsListDirectory((hdfsFS)fFH, path, &fDirEntries);
fDirp = (void *)hdfsListDirectory((hdfsFS)fFH, url.GetFileAndOptions(), &fDirEntries);
fDirCtr = 0;

fUrlp = new TUrl[fDirEntries];
Expand All @@ -390,7 +425,6 @@ void *THDFSSystem::OpenDirectory(const char * path)
}

////////////////////////////////////////////////////////////////////////////////
/// Free directory via httpd.

void THDFSSystem::FreeDirectory(void *dirp)
{
Expand All @@ -403,15 +437,14 @@ void THDFSSystem::FreeDirectory(void *dirp)
return;
}
if (fUrlp != 0) {
delete fUrlp;
delete[] fUrlp;
}

hdfsFreeFileInfo((hdfsFileInfo *)fDirp, fDirEntries);
fDirp=0;
}

////////////////////////////////////////////////////////////////////////////////
/// Get directory entry via httpd. Returns 0 in case no more entries.

const char *THDFSSystem::GetDirEntry(void *dirp)
{
Expand Down Expand Up @@ -457,7 +490,10 @@ Int_t THDFSSystem::GetPathInfo(const char *path, FileStat_t &buf)
Error("GetPathInfo", "No filesystem handle (should never happen)");
return 1;
}
hdfsFileInfo *fileInfo = hdfsGetPathInfo((hdfsFS)fFH, path);

TUrl url(path);

hdfsFileInfo *fileInfo = hdfsGetPathInfo((hdfsFS)fFH, url.GetFileAndOptions());

if (fileInfo == 0)
return 1;
Expand Down Expand Up @@ -489,7 +525,9 @@ Bool_t THDFSSystem::AccessPathName(const char *path, EAccessMode mode)
return kTRUE;
}

if (hdfsExists((hdfsFS)fFH, path) == 0)
TUrl url(path);

if (hdfsExists((hdfsFS)fFH, url.GetFileAndOptions()) == 0)
return kFALSE;
else
return kTRUE;
Expand Down

0 comments on commit 45f61ce

Please sign in to comment.