Skip to content

Commit

Permalink
Delay Hive metastore creation until identity is available
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Feb 7, 2022
1 parent 172cd28 commit 00ff03b
Show file tree
Hide file tree
Showing 16 changed files with 185 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airlift.bootstrap.LifeCycleManager;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorMetadata;
Expand Down Expand Up @@ -48,7 +47,6 @@ public class HiveConnector
implements Connector
{
private final LifeCycleManager lifeCycleManager;
private final TransactionalMetadataFactory metadataFactory;
private final ConnectorSplitManager splitManager;
private final ConnectorPageSourceProvider pageSourceProvider;
private final ConnectorPageSinkProvider pageSinkProvider;
Expand All @@ -70,7 +68,6 @@ public class HiveConnector

public HiveConnector(
LifeCycleManager lifeCycleManager,
TransactionalMetadataFactory metadataFactory,
HiveTransactionManager transactionManager,
ConnectorSplitManager splitManager,
ConnectorPageSourceProvider pageSourceProvider,
Expand All @@ -89,7 +86,6 @@ public HiveConnector(
ClassLoader classLoader)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
Expand Down Expand Up @@ -200,31 +196,21 @@ public boolean isSingleStatementWritesOnly()
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
checkConnectorSupports(READ_UNCOMMITTED, isolationLevel);
ConnectorTransactionHandle transaction = new HiveTransactionHandle();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
transactionManager.put(transaction, metadataFactory.create(autoCommit));
}
ConnectorTransactionHandle transaction = new HiveTransactionHandle(autoCommit);
transactionManager.begin(transaction);
return transaction;
}

@Override
public void commit(ConnectorTransactionHandle transaction)
{
TransactionalMetadata metadata = transactionManager.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
metadata.commit();
}
transactionManager.commit(transaction);
}

@Override
public void rollback(ConnectorTransactionHandle transaction)
{
TransactionalMetadata metadata = transactionManager.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
metadata.rollback();
}
transactionManager.rollback(transaction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airlift.event.client.EventClient;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.trino.plugin.hive.orc.OrcFileWriterFactory;
import io.trino.plugin.hive.orc.OrcPageSourceFactory;
import io.trino.plugin.hive.orc.OrcReaderConfig;
Expand All @@ -43,7 +42,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
Expand Down Expand Up @@ -143,11 +141,4 @@ public ScheduledExecutorService createHiveTransactionHeartbeatExecutor(CatalogNa
hiveConfig.getHiveTransactionHeartbeatThreads(),
daemonThreadsNamed("hive-heartbeat-" + catalogName + "-%s"));
}

@Singleton
@Provides
public Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> createMetastoreGetter(HiveTransactionManager transactionManager)
{
return transactionHandle -> transactionManager.get(transactionHandle).getMetastore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -95,7 +94,7 @@ public class HiveSplitManager
public static final String PRESTO_OFFLINE = "presto_offline";
public static final String OBJECT_NOT_READABLE = "object_not_readable";

private final Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider;
private final HiveTransactionManager transactionManager;
private final HivePartitionManager partitionManager;
private final NamenodeStats namenodeStats;
private final HdfsEnvironment hdfsEnvironment;
Expand All @@ -115,7 +114,7 @@ public class HiveSplitManager
@Inject
public HiveSplitManager(
HiveConfig hiveConfig,
Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider,
HiveTransactionManager transactionManager,
HivePartitionManager partitionManager,
NamenodeStats namenodeStats,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -125,7 +124,7 @@ public HiveSplitManager(
TypeManager typeManager)
{
this(
metastoreProvider,
transactionManager,
partitionManager,
namenodeStats,
hdfsEnvironment,
Expand All @@ -144,7 +143,7 @@ public HiveSplitManager(
}

public HiveSplitManager(
Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider,
HiveTransactionManager transactionManager,
HivePartitionManager partitionManager,
NamenodeStats namenodeStats,
HdfsEnvironment hdfsEnvironment,
Expand All @@ -161,7 +160,7 @@ public HiveSplitManager(
boolean recursiveDfsWalkerEnabled,
TypeManager typeManager)
{
this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -192,7 +191,7 @@ public ConnectorSplitSource getSplits(
SchemaTableName tableName = hiveTable.getSchemaTableName();

// get table metadata
SemiTransactionalHiveMetastore metastore = metastoreProvider.apply((HiveTransactionHandle) transaction);
SemiTransactionalHiveMetastore metastore = transactionManager.get(transaction).getMetastore();
Table table = metastore.getTable(new HiveIdentity(session), tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,50 @@
public class HiveTransactionHandle
implements ConnectorTransactionHandle
{
private final boolean autoCommit;
private final UUID uuid;

public HiveTransactionHandle()
public HiveTransactionHandle(boolean autoCommit)
{
this(UUID.randomUUID());
this(autoCommit, UUID.randomUUID());
}

@JsonCreator
public HiveTransactionHandle(@JsonProperty("uuid") UUID uuid)
public HiveTransactionHandle(@JsonProperty("autoCommit") boolean autoCommit, @JsonProperty("uuid") UUID uuid)
{
this.autoCommit = autoCommit;
this.uuid = requireNonNull(uuid, "uuid is null");
}

@JsonProperty
public boolean isAutoCommit()
{
return autoCommit;
}

@JsonProperty
public UUID getUuid()
{
return uuid;
}

@Override
public boolean equals(Object obj)
public boolean equals(Object o)
{
if (this == obj) {
if (this == o) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
if (o == null || getClass() != o.getClass()) {
return false;
}
HiveTransactionHandle other = (HiveTransactionHandle) obj;
return Objects.equals(uuid, other.uuid);
HiveTransactionHandle that = (HiveTransactionHandle) o;
return autoCommit == that.autoCommit && Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode()
{
return Objects.hash(uuid);
return Objects.hash(autoCommit, uuid);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,82 @@
*/
package io.trino.plugin.hive;

import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorTransactionHandle;

import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class HiveTransactionManager
{
private final ConcurrentMap<ConnectorTransactionHandle, TransactionalMetadata> transactions = new ConcurrentHashMap<>();
private final TransactionalMetadataFactory metadataFactory;
private final Map<ConnectorTransactionHandle, MemoizedMetadata> transactions = new ConcurrentHashMap<>();

@Inject
public HiveTransactionManager(TransactionalMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
}

public void begin(ConnectorTransactionHandle transactionHandle)
{
MemoizedMetadata previousValue = transactions.putIfAbsent(transactionHandle, new MemoizedMetadata());
checkState(previousValue == null);
}

public TransactionalMetadata get(ConnectorTransactionHandle transactionHandle)
{
return transactions.get(transactionHandle);
return transactions.get(transactionHandle).get(((HiveTransactionHandle) transactionHandle).isAutoCommit());
}

public TransactionalMetadata remove(ConnectorTransactionHandle transactionHandle)
public void commit(ConnectorTransactionHandle transaction)
{
return transactions.remove(transactionHandle);
MemoizedMetadata transactionalMetadata = transactions.remove(transaction);
checkArgument(transactionalMetadata != null, "no such transaction: %s", transaction);
transactionalMetadata.optionalGet().ifPresent(metadata -> {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
metadata.commit();
}
});
}

public void put(ConnectorTransactionHandle transactionHandle, TransactionalMetadata metadata)
public void rollback(ConnectorTransactionHandle transaction)
{
ConnectorMetadata previousValue = transactions.putIfAbsent(transactionHandle, metadata);
checkState(previousValue == null);
MemoizedMetadata transactionalMetadata = transactions.remove(transaction);
checkArgument(transactionalMetadata != null, "no such transaction: %s", transaction);
transactionalMetadata.optionalGet().ifPresent(metadata -> {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
metadata.rollback();
}
});
}

private class MemoizedMetadata
{
@GuardedBy("this")
private TransactionalMetadata metadata;

public synchronized Optional<TransactionalMetadata> optionalGet()
{
return Optional.ofNullable(metadata);
}

public synchronized TransactionalMetadata get(boolean autoCommit)
{
if (metadata == null) {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
metadata = metadataFactory.create(autoCommit);
}
}
return metadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public static Connector createConnector(String catalogName, Map<String, String>
.initialize();

LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
TransactionalMetadataFactory metadataFactory = injector.getInstance(TransactionalMetadataFactory.class);
HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
Expand All @@ -146,7 +145,6 @@ public static Connector createConnector(String catalogName, Map<String, String>

return new HiveConnector(
lifeCycleManager,
metadataFactory,
transactionManager,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.hive.security;

import io.trino.plugin.hive.HiveTransactionHandle;
import io.trino.plugin.hive.HiveTransactionManager;
import io.trino.plugin.hive.authentication.HiveIdentity;
import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.trino.plugin.hive.metastore.Table;
Expand All @@ -22,24 +22,24 @@
import javax.inject.Inject;

import java.util.Optional;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

public class SemiTransactionalLegacyAccessControlMetastore
implements LegacyAccessControlMetastore
{
private final Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider;
private final HiveTransactionManager transactionManager;

@Inject
public SemiTransactionalLegacyAccessControlMetastore(Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider)
public SemiTransactionalLegacyAccessControlMetastore(HiveTransactionManager transactionManager)
{
this.metastoreProvider = requireNonNull(metastoreProvider, "metastoreProvider is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
public Optional<Table> getTable(ConnectorSecurityContext context, HiveIdentity identity, String databaseName, String tableName)
{
return metastoreProvider.apply(((HiveTransactionHandle) context.getTransactionHandle())).getTable(new HiveIdentity(context.getIdentity()), databaseName, tableName);
SemiTransactionalHiveMetastore metastore = transactionManager.get(context.getTransactionHandle()).getMetastore();
return metastore.getTable(new HiveIdentity(context.getIdentity()), databaseName, tableName);
}
}
Loading

0 comments on commit 00ff03b

Please sign in to comment.