-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement cdap-tms-spanner extension - Part 1 #15730
base: develop
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the format for extension module is usually as cdap-[spi]-ext-[concrete]
In this case, it should be cdap-messaging-ext-spanner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend to break this PR into two. It'll make the debugging much simpler.
- SPI changes and TMS changes.
- spanner extension for messaging spi
|
||
@Override | ||
public Map<String, String> getProperties() { | ||
String spannerPropertiesPrefix = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TMS is one implementation of the messaging SPI. The other implementation will be spanner. So TMS should not have any reference to spanner.
* | ||
* @param messagingContext messaging service context | ||
*/ | ||
void destroy(MessagingContext messagingContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this? When do you want to close spanner? We probably should keep the client open in spanner messaging service in appfabric, otherwise it'll hurt the performance substantially.
+ " (OLDER_THAN(publish_ts, INTERVAL 7 DAY))", getTableName(topicId), SEQUENCE_ID_FIELD, | ||
PAYLOAD_SEQUENCE_ID, PUBLISH_TS_FIELD, PAYLOAD_FIELD); | ||
OperationFuture<Void, UpdateDatabaseDdlMetadata> future = adminClient.updateDatabaseDdl( | ||
SpannerUtil.getInstanceID(cConf), SpannerUtil.getDatabaseID(cConf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize instanceId, databaseId, etc at the beginning and reuse those fields. It'll be much simpler than calling SpannerUtil.getInstanceID(cConf), SpannerUtil.getDatabaseID(cConf), etc
try { | ||
future.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
LOG.error("Error when executing {}", topicSQL, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not log and forget the exception. Check the handleError in ClientMessagingService
} | ||
} | ||
|
||
public static String getTableName(TopicId topicId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why public?
try { | ||
Thread.sleep(5); | ||
} catch (InterruptedException e) { | ||
LOG.error("error during sleep", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not log and forget exceptions
Quality Gate failedFailed conditions See analysis details on SonarCloud Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
This PR contains initial implementation of spanner extension of messaging service.
For now conf property : messaging.service.name will decide if spanner messaging service will be used. This part resides in the DelegatingMessagingService implemented earlier.
Follow up PR to contain implementation of other methods in the SpannerMessagingService : especially fetch() and appropriately call destroy() to close spanner clients.
Reference Masoud's POC : develop...features/spanner-tms