Skip to content

Commit

Permalink
[trinodb#6] Add HDFS Caller Context
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjinde committed Mar 28, 2023
1 parent 7d3d8fc commit e51cf96
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;

import javax.inject.Inject;
Expand All @@ -39,4 +40,10 @@ public TrinoFileSystem create(ConnectorIdentity identity)
{
return new HdfsFileSystem(environment, new HdfsContext(identity));
}

@Override
public TrinoFileSystem create(ConnectorSession session)
{
return new HdfsFileSystem(environment, new HdfsContext(session));
}
}
10 changes: 10 additions & 0 deletions lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,39 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;

import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class HdfsContext
{
private final ConnectorIdentity identity;
private final Optional<String> queryId;

public HdfsContext(ConnectorIdentity identity)
{
this.identity = requireNonNull(identity, "identity is null");
this.queryId = Optional.empty();
}

public HdfsContext(ConnectorSession session)
{
requireNonNull(session, "session is null");
this.identity = requireNonNull(session.getIdentity(), "session.getIdentity() is null");
this.queryId = Optional.of(session.getQueryId());
}

public ConnectorIdentity getIdentity()
{
return identity;
}

public Optional<String> getQueryId()
{
return queryId;
}

@Override
public String toString()
{
Expand Down
19 changes: 19 additions & 0 deletions lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import io.trino.hadoop.HadoopNative;
import io.trino.hdfs.authentication.GenericExceptionAction;
import io.trino.hdfs.authentication.HdfsAuthentication;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemManager;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.CallerContext;

import javax.inject.Inject;

Expand Down Expand Up @@ -64,9 +66,26 @@ public Configuration getConfiguration(HdfsContext context, Path path)
public FileSystem getFileSystem(HdfsContext context, Path path)
throws IOException
{
if (context.getQueryId().isPresent()) {
setCallerContext(context.getIdentity(), context.getQueryId().get());
}
return getFileSystem(context.getIdentity(), path, getConfiguration(context, path));
}

public FileSystem getFileSystem(ConnectorSession session, Path path, Configuration configuration)
throws IOException
{
setCallerContext(session.getIdentity(), session.getQueryId());
return getFileSystem(session.getIdentity(), path, configuration);
}

private void setCallerContext(ConnectorIdentity identity, String queryId)
{
String callerContent = String.format("JobId:olap_trino_%s$User:%s", queryId, identity.getUser());
CallerContext callerContext = new CallerContext.Builder(callerContent).build();
CallerContext.setCurrent(callerContext);
}

public FileSystem getFileSystem(ConnectorIdentity identity, Path path, Configuration configuration)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -192,7 +191,7 @@ public Optional<ReaderPageSource> createPageSource(

ConnectorPageSource orcPageSource = createOrcPageSource(
hdfsEnvironment,
session.getIdentity(),
session,
configuration,
path,
start,
Expand Down Expand Up @@ -224,7 +223,7 @@ public Optional<ReaderPageSource> createPageSource(

private ConnectorPageSource createOrcPageSource(
HdfsEnvironment hdfsEnvironment,
ConnectorIdentity identity,
ConnectorSession session,
Configuration configuration,
Path path,
long start,
Expand Down Expand Up @@ -252,8 +251,8 @@ private ConnectorPageSource createOrcPageSource(

boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty();
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration);
FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path));
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session, path, configuration);
FSDataInputStream inputStream = hdfsEnvironment.doAs(session.getIdentity(), () -> fileSystem.open(path));
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(path.toString()),
estimatedFileSize,
Expand Down Expand Up @@ -397,8 +396,8 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
Optional<OrcDeletedRows> deletedRows = acidInfo.map(info ->
new OrcDeletedRows(
path.getName(),
new OrcDeleteDeltaPageSourceFactory(options, identity, configuration, hdfsEnvironment, stats),
identity,
new OrcDeleteDeltaPageSourceFactory(options, session.getIdentity(), configuration, hdfsEnvironment, stats),
session.getIdentity(),
configuration,
hdfsEnvironment,
info,
Expand All @@ -412,7 +411,7 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
acidInfo.get().getOriginalFiles(),
path,
hdfsEnvironment,
identity,
session.getIdentity(),
options,
configuration,
stats));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ else if (deserializerClassName.equals(ColumnarSerDe.class.getName())) {

RcFileDataSource dataSource;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, configuration);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session, path, configuration);
FSDataInputStream inputStream = hdfsEnvironment.doAs(session.getIdentity(), () -> fileSystem.open(path));
if (estimatedFileSize < BUFFER_SIZE.toBytes()) {
// Handle potentially imprecise file lengths by reading the footer
Expand Down

0 comments on commit e51cf96

Please sign in to comment.