-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change getStreams to use multiple clients #19
base: flight-jdbc-driver
Are you sure you want to change the base?
Conversation
private final AtomicInteger clientCounter = new AtomicInteger(); | ||
|
||
public KeyedFlightSqlClientObjectPoolFactory(final BufferAllocator parentAllocator) { | ||
super(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's avoid default super calls.
|
||
public void closeAllocator() { | ||
parentAllocator.getChildAllocators().forEach(BufferAllocator::close); | ||
parentAllocator.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about closing the parent allocator here... I don't see a call to close() being removed which means prior to this patch we were either leaking an allocator or we are double-closing this.
|
||
public class KeyedFlightSqlClientObjectPool extends GenericKeyedObjectPool<Location, FlightSqlClient> { | ||
|
||
public KeyedFlightSqlClientObjectPool(KeyedPooledObjectFactory<Location, FlightSqlClient> factory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The casting of the factory to call a close method on it is weird.
Instead, you can make this class own the allocator:
- Have the allocator be the parameter to the constructor.
- call super(new KeyedFlightSql...()) and have that take in the allocator
- override close() in this class, and move all the allocator cleanup there.
@Override | ||
public synchronized void close() { | ||
((KeyedFlightSqlClientObjectPoolFactory) getFactory()).closeAllocator(); | ||
super.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should actually call super.close() first to clean up the clients which are depending on the allocator, then clean up the allocator. (Normally the right thing to do is do child class clean-up, then base class clean-up but I think the logic needs to be different in this scenario).
} | ||
final Location location = locations.get(0); // purposefully discard other locations | ||
|
||
logger.info(String.format("Getting a client for location %s.", location)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use String.format() with loggers. This executes the work to generate a string, even if logging is off.
Use the logger directly. Note that placeholders are curly braces in SLF4J eg:
logger.info("Getting client for location {}.", location);
please fix this throughout the PR.
flightSqlClient = flightSqlClientPool.borrowObject(location); | ||
} catch (final NoSuchElementException e) { | ||
try { | ||
flightSqlClientPool.addObject(location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think this happens automatically rather than needing try/catch.
} | ||
|
||
allStreams.add(flightSqlClient.getStream(endpoint.getTicket(), getOptions())); | ||
flightSqlClientPool.returnObject(location, flightSqlClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot return the client to the pool until we are done getting the stream. Otherwise it allows another query being executed to re-use the client while it's still being used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can write an auto-closeable wrapper on top of FlightStream that returns to the pool on closure.
532d91f
to
59148df
Compare
* Remove scope from 'hamcrest' dependency on java/pom.xml * Use flight top-level module on parent pom.xml instead of declaring each one * Avoid using getStatement inside StatementContext methods * Make StatementContext.getQuery() return String * Minor fixes on pom.xml * Move 'os-maven-plugin' to parent pom.xml * Update protobuf generation on pom.xml files * Use ClassLoader#getResource to get network.properties on TestFlightSql * Bind to any ephemeral port on TestFlightSql * Move JDBC-Arrow type default conversion from JdbcToArrowConfig to JdbcToArrowUtils * Micro-optimization: initialize ArrayList with the right size * Fix null-check on PreparedStatement#setParameters * Avoid wrapping vector into a ImmutableList and then into an ArrayList on FlightSqlExample#getTablesRoot * Remove null-check on VectorSchemaRoot on FlightSqlClient#setParameters() * Remove the need of separate cache for ResultSets * Add missing 'final' modifiers
…les and query execution methods. (apache#226) This add an auxiliary class FlightSqlColumnMetadata meant to read and write known metadata for Arrow schema fields, such as CATALOG_NAME, SCHEMA_NAME, TABLE_NAME, PRECISION, SCALE, IS_AUTO_INCREMENT, IS_CASE_SENSITIVE, IS_READ_ONLY and IS_SEARCHABLE.
8db5a48
to
a8d1d53
Compare
This is currently just a draft but it tries to address the issue of only using a single client to grab multiple streams.
It uses Apache Pools to pool the clients keyed to their location correspondingly.