-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Connector]Add plugin for Apache Pulsar. #8020
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few initial commits. Please check why the build is failing. It looks like the license header you are using is different than expected. But there might be other formatting issues as well.
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarColumnHandle.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnector.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnector.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCache.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCache.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCache.java
Outdated
Show resolved
Hide resolved
@MarvinCai If you any of the comment is not clear please reach out to me on slack. https://trino.io/slack.html |
@kokosing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of initial comments.
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnector.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCacheImpl.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCacheImpl.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarConnectorCacheImpl.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/test/java/io/trino/plugin/pulsar/TestPulsarRecordCursor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bunch of doc structure and content changes needed. If desired I can help in a chat on slack or even a call for a quick sync and work together.
which enables concurrent reads and high read throughput. You can configure the number of BookKeeper nodes, | ||
and the default number is 3. In Trino Pulsar connector, data is read directly from BookKeeper, | ||
so Trino workers can read concurrently from horizontally scalable number BookKeeper nodes. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably here insert a Requirements section modelled in wording after e.g. the sqlserver.rst currently in master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we've added the Requirements section in the draft and need your review later, thanks
``pulsar.nar-extraction-directory`` Directory to use for extraction Nar file | ||
================================================ ======================================================================= | ||
|
||
``pulsar.broker-service-url`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not use the following setup. I would suggest to work all the content below into the table above. Convert it to use the list-table setup though and stay with the 80 char column hard wrap
``pulsar.nar-extraction-directory`` Directory to use for extraction Nar file | ||
================================================ ======================================================================= | ||
|
||
``pulsar.broker-service-url`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even better would be to break the table and config section up into multiple sections for example about required config, bookkeeper config, security config and others and have separate tables. Each section could then have more info to explain details for this topic.
Hi @kokosing @bitsondatadev @mosabua thank you very much for your advice on the docs! |
@mosabua Thanks for reviewing the doc, we actually want to separate the doc and code so we'll have a separate pr for the doc and technical writer from our community @Anonymitaet will work on the doc, all the comments will be taken care of. |
Sounds great @MarvinCai .. it would be good to chat with @Anonymitaet to make it more efficient for her. Please feel free to reach out on slack and I can arrange a meeting in which we could hack on it together. |
@Anonymitaet can you send me an email with your timezone and a few available time slots this week or next week .. maybe 1h to start.. then I can set up a meeting with video call to work on this for a bit |
@mosabua sure, I've sent an email to you and invited @MarvinCai to attend this meeting as well (as we might need tech inputs and discussions). Looking forward to meeting you guys. @mosabua before the meeting, if you have any suggestions or questions, could you please comment them in the docs? Then we can prepare in advance and improve the communication efficiency, many thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading and learning. I will try to add more comments tomorrow.
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarAdminClientProvider.java
Outdated
Show resolved
Hide resolved
|
||
private final PulsarConnectorCache pulsarConnectorManagedLedgerFactory; | ||
|
||
private static final Logger log = Logger.get(PulsarSplitManager.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static fields before normal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated fields order
plugin/trino-pulsar/src/main/java/io/trino/plugin/pulsar/PulsarSchemaInfoProvider.java
Outdated
Show resolved
Hide resolved
protected QueryRunner createQueryRunner() | ||
throws Exception | ||
{ | ||
pulsarServer = new PulsarServer(PulsarServer.DEFAULT_IMAGE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use closeAfterTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean closeAfterClass(), updated to wrap the test server with closeAfterClass().
plugin/trino-pulsar/src/test/java/io/trino/plugin/pulsar/TestBasePulsarConnectorTest.java
Outdated
Show resolved
Hide resolved
{ | ||
Logging.initialize(); | ||
|
||
DistributedQueryRunner queryRunner = createPulsarQueryRunner( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please start Trino on 8080
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reading...
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.pulsar.decoder.avro; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
.../trino-pulsar/src/main/java/io/trino/plugin/pulsar/decoder/avro/PulsarAvroColumnDecoder.java
Outdated
Show resolved
Hide resolved
handleKeyValueType, new PulsarColumnMetadata.DecoderExtraInfo(field.name(), | ||
null, null)) | ||
|
||
).collect(toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move ) to previous line, collect to immutable list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
...no-pulsar/src/main/java/io/trino/plugin/pulsar/decoder/avro/PulsarAvroRowDecoderFactory.java
Outdated
Show resolved
Hide resolved
...no-pulsar/src/main/java/io/trino/plugin/pulsar/decoder/avro/PulsarAvroRowDecoderFactory.java
Outdated
Show resolved
Hide resolved
...no-pulsar/src/main/java/io/trino/plugin/pulsar/decoder/avro/PulsarAvroRowDecoderFactory.java
Outdated
Show resolved
Hide resolved
...no-pulsar/src/main/java/io/trino/plugin/pulsar/decoder/avro/PulsarAvroRowDecoderFactory.java
Show resolved
Hide resolved
import static io.trino.plugin.pulsar.PulsarServer.SELECT_FROM_ORDERS; | ||
import static io.trino.plugin.pulsar.PulsarServer.SELECT_FROM_REGION; | ||
|
||
public class TestBasePulsarConnectorTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestBasePulsarConnectorTest -> TestPulsarConnectorTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
plugin/trino-pulsar/src/test/java/io/trino/plugin/pulsar/TestPulsarSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-pulsar/src/test/java/io/trino/plugin/pulsar/TestPulsarMetadata.java
Outdated
Show resolved
Hide resolved
Hi @MarvinCai I've submitted the doc PR #8469, you can revert doc changes in this PR. Any progress on this code PR? thanks |
addressed 1/3 of the comments, will address the rest soon |
@MarvinCai OK, look forward to your changes. |
Hi @MarvinCai I see you've made code changes, do we need to update docs (#8469) accordingly? thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left few random comments... I need to get back to it. It looks much better.
.github/workflows/ci.yml
Outdated
@@ -232,7 +232,7 @@ jobs: | |||
!:trino-cassandra, | |||
!:trino-hive,!:trino-orc,!:trino-parquet, | |||
!:trino-mongodb,!:trino-kafka,!:trino-elasticsearch, | |||
!:trino-redis, | |||
!:trino-redis,!:trino-pulsar, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add separate line for pulsar, I don't see a reason why it is merged here. The list here should match the list below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, previously there's only unit test so we thought it can be merged with other test, now that we added query test and it takes 10+ mins, probably better to run as a separate step. updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, please make exclusion for trino-pulsar also in separate line.
@@ -25,13 +25,16 @@ | |||
import io.trino.spi.type.ArrayType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any changes to existing code should go as separate commit. See https://github.com/trinodb/trino/blob/master/DEVELOPMENT.md#git-merge-strategy
Also is it possible to extract a separate PR for them? How does it affect existing connectors that are using these? Like Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually as we still have to duplicate some code in PulsarAvroColumnDecoder, I'll just keep these additional types in PulsarAvroColumnDecoder so existing connector won't be affected. Will only make method protected so they can be override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will create separate pr once change finalized.
} | ||
|
||
if (type instanceof TimestampType) { | ||
type.writeLong(blockBuilder, (Long) value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not every timestamp can be a long. That depends on precision. Please make sure that precision allows to use long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change will be removed from AvroColumnDecoder, while just for Pulsar connector it should be safe
@@ -46,6 +46,13 @@ | |||
private final long minValue; | |||
private final long maxValue; | |||
|
|||
protected DefaultJsonFieldDecoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please undo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit tricky, I'm trying to extend DefaultJsonFieldDecoder to reduce code duplication, while I do want to utilize the current constructor, but it can only invoke isSupportedType
in DefaultJsonFieldDecoder, which will prevent us from supporting new types, I'm adding this default constructor as workaround so we can skip isSupportedType
when calling super()
, any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestion?
Yes, please undo ;)
Please make PulsarJsonFieldDecoder
to implement JsonFieldDecoder
without using DefaultJsonFieldDecoder
.
} | ||
|
||
@Override | ||
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table layouts are going to be removed. Can you please return true
for io.trino.spi.connector.ConnectorMetadata#usesLegacyTableLayouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try to not to use Layouts at all. They are going to be removed soon so it is not wise to introduce a feature based on deprecated framework.
} | ||
else if (e.getStatusCode() == 401) { | ||
throw new TrinoException(QUERY_REJECTED, | ||
String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static imports for format
topicName)); | ||
} | ||
else { | ||
throw new RuntimeException("Failed to get pulsar topic schema information for topic " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use format
protected PulsarServer pulsarServer; | ||
|
||
@AfterClass(alwaysRun = true) | ||
public void destroy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, already wrapped with closeAfterClass, removed.
} | ||
|
||
@Override | ||
protected QueryRunner createQueryRunner() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move it to the top
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
It's mostly a migration of original Pulsar PrestoSQL connector with update for all the package to Trino and fix dependency and code style issues: https://github.com/apache/pulsar/tree/v2.8.0/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto This plugin only support read from Pulsar for now, write capability will be added later.
023feb1
to
06d9b0c
Compare
@kokosing my bad, thought you might have more comments later |
My bad. I thought will have more comments, but I didn't have time to take a look. Sorry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments. Please focus now on decoders.
@@ -91,7 +91,7 @@ public AvroColumnDecoder(DecoderColumnHandle columnHandle) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to add support for short timestamp, date, time and real support to AvroColumnDecoder
? Then we should be able to drop PulsarAvroColumnDecoder
. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The blocker is conversion from shaded version of avro record to original avro record so pulsar connector can utilize AvroColumnDecoder
, as the whole connector use shaded version of deps.
I tried again and was able to make the conversion this time.
I also move the support of additional type to AvroColumnDecoder
, but I'm not sure how will this affect existing connectors? My understanding is it shouldn't be problem as it's "additional" suuport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to add support for short timestamp, date, time and real support to
AvroColumnDecoder
? Then we should be able to dropPulsarAvroColumnDecoder
. WDYT?
I like this idea. @kokosing is there any existing issue? Or I can try to drive one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI #13069.
BTW pulsar-sql supports Decimal type in:
It's possible to add them to AvroColumnDecoder later.
Also I think that this is not a blocker to add pulsar plugin as we can duplicate code a bit for Pulsar decoders - they have their own logics.
@@ -46,6 +46,13 @@ | |||
private final long minValue; | |||
private final long maxValue; | |||
|
|||
protected DefaultJsonFieldDecoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestion?
Yes, please undo ;)
Please make PulsarJsonFieldDecoder
to implement JsonFieldDecoder
without using DefaultJsonFieldDecoder
.
plugin/trino-pulsar/pom.xml
Outdated
<artifactId>pulsar-client-mledger-shaded</artifactId> | ||
<version>${pulsar.version}</version> | ||
<exclusions> | ||
<exclusion> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate and equal? Maybe you can relocate them (shade)?
private PulsarAdminClientProvider() | ||
{ } | ||
|
||
@NotNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Trino everything by default is not null. We annotate only @Nullable
variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final Type type; | ||
|
||
/** | ||
* True if the column should be hidden. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such comments are obvious, please drop them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Override | ||
public int hashCode() | ||
{ | ||
int result = catalogName != null ? catalogName.hashCode() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Objects.hashCode, here and everywhere else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regenerated with IntelliJ, now using Objects.hash or Objects.hashCode
return false; | ||
} | ||
|
||
return Objects.equals(handleKeyValueType, that.handleKeyValueType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like equals is hand-written, can you regenerate it using Intellij?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@Override | ||
public String toString() | ||
{ | ||
return "PulsarColumnHandle{" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ToStringHelper
, here and everywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, updated.
import java.util.Objects; | ||
|
||
public class PulsarColumnMetadata | ||
extends ColumnMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use compostion instead of inheritance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after closer look PulsarColumnMetadata is just carrying some additional properties, so I use properties of ColumnMetadata to carry such info when necessary and make PulsarColumnMetadata just a util class.
private final ConnectorMetadata metadata; | ||
private final ConnectorSplitManager splitManager; | ||
private final ConnectorRecordSetProvider recordSetProvider; | ||
private final PulsarConnectorConfig pulsarConnectorConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable is not used, please remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@kokosing address your latest comments, PTAL |
here's the pom for the shaded pulsar dep: https://github.com/MarvinCai/pulsar/blob/zxc/trino-module/pulsar-client-mledger-shaded/pom.xml |
… of using the oldest schema.
@MarvinCai Seems there were some CI tests that failed, Would you please help take a look at it? |
@jiazhai That's because we made a new maven module to provide a shaded jar for this connector, and we plan to add it under trinodb organization, and currently it's only available on my local dev environment so the ci task just fail to build this connector. Once we finalized the dependency we'll create that new module which should fix all the build failure. |
@kokosing @MarvinCai Just curious to learn what's the progress for this PR? |
@@ -25,13 +25,16 @@ | |||
import io.trino.spi.type.ArrayType; | |||
import io.trino.spi.type.BigintType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be possilbe to undo changes in avro column decoder? And instead of inheritance to use composition so whenever you want different behaviour you can override it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw that Kafka plugin writes its own AvroRowEncoder
and actually Pulsar writes its own AvroColumnDecoder
in pulsar-sql
now. The major concern can be since PulsarAvroColumnDecoder
is derive from AvroColumnDecoder
, we may want to keep their logic synced. All significant fields and methods in AvroColumnDecoder
seems private
though.
|
||
import javax.ws.rs.client.ClientBuilder; | ||
|
||
public class PulsarAdminClientProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final. Apply this to all utility classes
|
||
public class PulsarAdminClientProvider | ||
{ | ||
private PulsarAdminClientProvider() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private PulsarAdminClientProvider() {}
Please apply this to all utility classes
private PulsarAdminClientProvider() | ||
{ } | ||
|
||
public static PulsarAdmin getPulsarAdmin(PulsarConnectorConfig config) throws PulsarClientException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static PulsarAdmin getPulsarAdmin(PulsarConnectorConfig config)
throws PulsarClientException
Please apply this to all places like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a style checker for all these requirements, or even some tasks like spotless to apply these rules automatically?
implements DecoderColumnHandle | ||
{ | ||
private final String catalogName; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove such empty lines.
private final String schemaName; | ||
|
||
/** | ||
* The table name used by Trino. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove such obvious comments
else if (type == BIGINT) { | ||
return new AbstractMap.SimpleEntry((long) Long.MIN_VALUE, (long) Long.MAX_VALUE); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove else
words in this method
this.value = value; | ||
this.columnHandle = columnHandle; | ||
this.minValue = minValue; | ||
this.maxValue = maxValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rnn for all
} | ||
} | ||
catch (NumberFormatException ignore) { | ||
// ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not swallow exceptions
{ | ||
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); | ||
|
||
final String managedLedgerOffloadDriver = "s3"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add you plugin to ./core/trino-server/src/main/provisio/presto.xml
It would be nice to have product test for this to make sure it actually works end to end.
@MarvinCai Any ETA for getting the PR review comments addressed? |
@MarvinCai Are you still actively working on this PR? Do you need any help in getting this to completion? |
I'll take a look into this patch in this month if @MarvinCai doesn't make new progresses. However, it seems almost a rewrite of pulsar-presto-plugin released from within the pulsar community. Even I cannot run the Pulsar SQL example from the nightly version apache/pulsar#16354. Another possible approach is we bump the dependency from "Presto" to "Trino" and change the brands everywhere. After we make this change, we push the pulsar-trino-plugin to this repository and perhaps it can provide a more smooth experience - this patch is made before Pulsar released 2.8.0 but now it's 2.11.0. We may collaborate on the pulsar repo to make the existing plugin a good fit to trino while keep it synced with pulsar release so that we don't diverge. What do you think? cc @MarvinCai @kokosing @hashhar @lhotari |
/** | ||
* This class helps to resolve classes for the Trino Pulsar connector. | ||
*/ | ||
public class PulsarHandleResolver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API changed in 9c2f86d.
UPDATED - You can comment on apache/pulsar#16494. I'm starting upgrade presto to trino in pulsar codebase at branch https://github.com/tisonkun/pulsar/tree/presto-to-trino. It seems we meet four major breaking changes:
cc @kokosing @MarvinCai @lhotari perhaps we can bump presto to trino in pulsar codebase first so that we can deliver the change to users quickly and later after we switch to trino dependency it will be smoother to contribute to trino. I hope that we can collaborate on the branch posted here and looking forward to your ideas. |
👋 @MarvinCai - this PR is inactive and doesn't seem to be under development. If you'd like to continue work on this at any point in the future, feel free to re-open. @tisonkun - if you're working on running with this, please reach out if you need help with anything. |
@colebow Thanks for your follow-up! I have a pending PR #13070 that needs more feedback. If you guys think it may not be a common use case, I can integrate it into the trino-pulsar-plugin level. For continuing this contribution, there're still other efforts that should be done to catch up with the latest Trino version. As listed above, a few breaking changes are made and the trino version we're using now is 363 (compared to the latest one 400). |
Fix #7852
Add a plugin for Apache Pulsar.
It's mostly a migration of original Pulsar PrestoSQL connector with update for all the package to Trino and fix dependency and code style issues.
This plugin only support read from Pulsar for now, write capability will be added later.