Skip to content

Commit

Permalink
Merge branch 'master' into iceberg-1.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong authored Sep 2, 2024
2 parents cf801af + 8207efc commit f0e2774
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import org.apache.amoro.shade.thrift.org.apache.thrift.TProcessor;
import org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolFactory;
import org.apache.amoro.shade.thrift.org.apache.thrift.server.THsHaServer;
import org.apache.amoro.shade.thrift.org.apache.thrift.server.TServer;
import org.apache.amoro.shade.thrift.org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException;
import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory;
Expand Down Expand Up @@ -374,15 +374,13 @@ private TServer createThriftServer(
TTransportFactory transportFactory = new TFramedTransport.Factory();
TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
multiplexedProcessor.registerProcessor(processorName, processor);
TThreadedSelectorServer.Args args =
new TThreadedSelectorServer.Args(serverTransport)
THsHaServer.Args args =
new THsHaServer.Args(serverTransport)
.processor(multiplexedProcessor)
.transportFactory(transportFactory)
.protocolFactory(protocolFactory)
.inputProtocolFactory(inputProtoFactory)
.executorService(executorService)
.selectorThreads(selectorThreads)
.acceptQueueSizePerThread(queueSizePerSelector);
.executorService(executorService);
LOG.info(
"The number of selector threads for the {} thrift server is: {}",
processorName,
Expand All @@ -391,7 +389,7 @@ private TServer createThriftServer(
"The size of per-selector queue for the {} thrift server is: {}",
processorName,
queueSizePerSelector);
return new TThreadedSelectorServer(args);
return new THsHaServer(args);
}

private ThreadFactory getThriftThreadFactory(String processorName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.amoro.server.catalog;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
Expand All @@ -40,6 +43,7 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.requests.CreateTableRequest;

Expand All @@ -50,10 +54,13 @@ public class InternalIcebergCatalogImpl extends InternalCatalog {
final int httpPort;
final String exposedHost;

final Cache<AmoroTable<?>, FileIO> fileIOCloser;

protected InternalIcebergCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) {
super(metadata);
this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT);
this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST);
this.fileIOCloser = newFileIOCloser();
}

@Override
Expand Down Expand Up @@ -96,12 +103,14 @@ public AmoroTable<?> loadTable(String database, String tableName) {
.toString());
org.apache.amoro.table.TableIdentifier tableIdentifier =
org.apache.amoro.table.TableIdentifier.of(name(), database, tableName);

return IcebergTable.newIcebergTable(
tableIdentifier,
table,
CatalogUtil.buildMetaStore(getMetadata()),
getMetadata().getCatalogProperties());
AmoroTable<?> amoroTable =
IcebergTable.newIcebergTable(
tableIdentifier,
table,
CatalogUtil.buildMetaStore(getMetadata()),
getMetadata().getCatalogProperties());
fileIOCloser.put(amoroTable, ops.io());
return amoroTable;
}

protected AuthenticatedFileIO fileIO(CatalogMeta catalogMeta) {
Expand Down Expand Up @@ -144,4 +153,17 @@ public <O> InternalTableHandler<O> newTableHandler(String database, String table
//noinspection unchecked
return (InternalTableHandler<O>) new InternalIcebergHandler(getMetadata(), metadata);
}

private Cache<AmoroTable<?>, FileIO> newFileIOCloser() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<AmoroTable<?>, FileIO>)
(tbl, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()),
new BasicUnkeyedTable(
tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties());
}

return new org.apache.amoro.formats.mixed.MixedTable(
mixedIcebergTable, TableFormat.MIXED_ICEBERG);
AmoroTable<?> amoroTable =
new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG);
fileIOCloser.put(amoroTable, fileIO);
return amoroTable;
}

protected TableFormat format() {
Expand Down

0 comments on commit f0e2774

Please sign in to comment.