Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for different backends, starting with MongoDB #164

Merged
merged 1 commit into from
Oct 12, 2023
Merged

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Oct 11, 2023

This PR is the minimal set of changes I needed to make in order to comfortably support different backends. There are still some places (and I will point them out with inline comments on this PR) where it's awkward and doesn't totally fit our existing. Here's the general review guide:

  1. I refactored the ResponsiveKafkaStreams chain of constructors into an internal builder to make it easier to modify
  2. Added BackendStorage enum that signals which backend to use
  3. Moved most C* specific things into the corresponding RemoteTable classes
  4. Made the ResponsiveExtension for spinning up test containers smart about whether it should spin up a C* or Mongo container. You can see MinimalIntegraionTest as an example of the new usage.

Unfortunately this will mean that we need to package both Mongo/Scylla clients into our JAR. That's not ideal, but hopefully soon we can move all of this into a proxy.

NOTE: I haven't spent any time thinking about the MongoDB schema or optimizing it. All I did was the minimal amount of work to get some of the tests to pass (e.g. the restore integration test), I'll spend some time reading up on that later.

Filters.eq("key", key.get()),
Updates.set("value", value),
new UpdateOptions().upsert(true));
return null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The C* client requires us to be able to batch statements of different types (insert/delete) together and then execute them as a single batch on flush. We should change this API not to return BoundStatement which is specific to the Cassandra driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense -- can we leave a TODO or do you plan to follow up on this immediately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do that relatively soon, it's bugging me quite a bit 😆 and I think it'll become more important when I do EOS support.


@Override
public CompletionStage<RemoteWriteResult> flush() {
return CompletableFuture.completedFuture(RemoteWriteResult.success(partition));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to either have two different APIs -- one for sync one for async, or just use the Async mongo driver here. We'll need to do some refactoring of the API to make sure that it doesn't have anything C* specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure I understand: the C* client is currently/already async, so the choice is between introducing a synchronous API for the mongo client vs plugging in the async mongo client -- is that correct?

If so, two followup questions:

  1. why not just plug in the async mongo driver right away?
  2. it looks like we're just automatically plugging the results into a completed future/stage in both clients right now, so it's not truly async at the moment anyways. Do we want to make it be truly async at some point in the near future? Like I guess we could do something pretty much exactly the same as the recent changelog truncation optimization -- we don't actually need to know if it failed/was offset fenced during the Kafka commit, since either way we only flush after the txn/offsets are committed to kafka anyways. So we could just kick off a new flush after a commit in the same way that we kick off a new DeleteRecords request, and throw a fenced exception if it turns out the flush did indeed fail

I guess I'm getting ahed of things here, obviously this doesn't need to happen as part of this change. Just trying to envision the future and why we would/would not want the async driver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Async is generally more complicated and I wanted to optimize for simplicity for the first go around. Also curious about non-async performance since if it's good enough we can just batch the entire thing in the background
  2. I don't think that's true for the LwtWriter (it is true for the FactWriter), which does return a future that isn't complete. I do think we need to know at least before the next flush whether or not it succeeded, so making that optimization is a bit more difficult. We wouldn't want to flush the next batch if the previous batch failed to flush or we'd corrupt the remote store.

if (result == null) {
throw new IllegalStateException("Expected to find metadata row");
}
return new MetadataRow(result.offset, -1L);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that i haven't done any work yet to support EOS or fencing

}

public ResponsiveExtension(final StorageBackend backend) {
this.backend = backend;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider extending this approach to allow for spinning up any combination of containers to speed up runs (e.g. run just with Kafka or just with Cassandra) for certain tests

@agavra agavra marked this pull request as ready for review October 11, 2023 17:27
Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright...lots of comments. Most of them are larger structural feedback about how to fit this into our existing framework and specifically the new table/spec/operations approach (to the extent it makes sense to do so)

I know this is just the starter PR so you might have already thought of many of these suggestions and just didn't want/have time to fit it into the first iteration. Feel free to merge it if the pluggable client is working, and we can iterate from there

clientSupplier,
time,
new DefaultCassandraClientFactory()
new Params(topology, configs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I've been meaning to clean this up for some time now as it's completely out of control.


switch (backendType) {
case CASSANDRA:
final var cqlSession = cassandraFactory.createCqlSession(responsiveConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this here/now, but it seems like we don't have any need for the CqlSession being an explicit step -- ie we always just pass the result immediately back into the cassandraFactory that it came from.

Then I think we can consolidate the logic and make it easier + cleaner to hook into/mock the client in tests. This seems to be the only reason we have a CassandraFactoryvs just creating the client directly.

So instead of hooking Mongo in directly, could we just generalize the C* factory into something like this:

public interface StorageClientSupplier<S extends StorageClient> {
  S createClient(ResponsiveConfig config);
}

And then obviously both clients will implement the StorageClient interface (or whatever we'd call it), and we can enforce that only one or the other client is used by consolidating the separate client fields in the SharedClients struct

I also think we should try to keep the different storage clients as aligned as possible by implementing a common interface, which this would allow/enforce

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! I was imagining something pretty similar, but didn't want it to spiral at this point. all of this should definitely be encapsulated away somewhere so that we can also more easily test different implementations instead of repeating this same logic throughout

@@ -68,6 +68,10 @@ public class ResponsiveConfig extends AbstractConfig {
public static final String CLIENT_SECRET_CONFIG = "responsive.client.secret";
private static final String CLIENT_SECRET_DOC = "The client secret for authenticated access";

public static final String STORAGE_BACKEND_TYPE_CONFIG = "responsive.storage.backend.type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a nit, but I would prefer if we defined String constants here that correspond to the various allowed storage engines we offer. For one thing it's easier for users to find and understand the options, for another my personal philosophy at least is that config values should always default to being strings unless there is a really good reason. To me it boils down to "is it literally impossible for this config to work without a String value?" In this case from the user perspective there's not much difference between defining it as an enum vs a string, but from our perspective we now need to worry about correctly parsing/converting it.

But as I said, that's mostly personal philosophy -- definitely won't die on this hill if you disagree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enums are nice for switch statements in the code b/c IntelliJ can autofill them - otherwise I don't have a strong preference so I can change it. I guess if the config is well enough encapsulated I shouldn't be checking it in too many places anyway ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can refactor this later since it's totally backwards compatible (note that the config itself is actually a String, I'm just maintaining it as an enum)

@@ -28,23 +28,18 @@
* table are only prepared once during the lifetime of the application.
*/
@ThreadSafe
public class TableFactory<T extends RemoteTable<?>> {
public class TableCache<T extends RemoteTable<?>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this name definitely makes more sense for the class


import org.bson.types.ObjectId;

public class KVDoc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with mongo, but I take it they refer to a key-value entry as a "doc" (document?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MongoDB only has "documents" -- they don't even directly expose the "key", which is an ObjectId. I still need to figure out exactly how Mongo works with this regard, but essentially yes everything that's inserted into Mongo is considered a "document"

private final TableFactory<RemoteKVTable> kvFactory;
private final TableFactory<RemoteWindowedTable> windowedFactory;
private final TableCache<RemoteKVTable> kvFactory;
private final TableCache<RemoteWindowedTable> windowedFactory;

public TTDCassandraClient(final TTDMockAdmin admin, final Time time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that would be nice about generalizing the storage backend at the Client/ClientFactory level (ie having the CassandraClient and MongoClient implement a common client & factory interface vs branching immediately based on enums) is that we could probably throw out a lot of the gross hacks we currently put into the TTD version of the C* client, and just have a separate TTDClient and TTDClientFactory that is completely state-store agnostic.

We could even make it fully pluggable so that users could choose whether to run with the mock client or use one of the real storage backends, eg with a container setup. I think that would be pretty slick

Obviously can/should be a followup project, just thinking out loud here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be pretty cool indeed! I'll turn this into a ticket so we don't forget about it. #165


@Override
public CompletionStage<RemoteWriteResult> flush() {
return CompletableFuture.completedFuture(RemoteWriteResult.success(partition));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure I understand: the C* client is currently/already async, so the choice is between introducing a synchronous API for the mongo client vs plugging in the async mongo client -- is that correct?

If so, two followup questions:

  1. why not just plug in the async mongo driver right away?
  2. it looks like we're just automatically plugging the results into a completed future/stage in both clients right now, so it's not truly async at the moment anyways. Do we want to make it be truly async at some point in the near future? Like I guess we could do something pretty much exactly the same as the recent changelog truncation optimization -- we don't actually need to know if it failed/was offset fenced during the Kafka commit, since either way we only flush after the txn/offsets are committed to kafka anyways. So we could just kick off a new flush after a commit in the same way that we kick off a new DeleteRecords request, and throw a fenced exception if it turns out the flush did indeed fail

I guess I'm getting ahed of things here, obviously this doesn't need to happen as part of this change. Just trying to envision the future and why we would/would not want the async driver

cache = new TableCache<>(spec -> new MongoKVTable(client, spec.tableName()));
}

public RemoteKVTable kvTable(final String name) throws InterruptedException, TimeoutException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a big deal right now if things are working, but presumably we will eventually want the clients to implement a common interface and integrate into the new Spec/Operations structure, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, that's definitely something I'd like to do

@@ -117,6 +115,29 @@ public static PartitionedOperations create(
);
}

private static RemoteKVTable createCassandra(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't push back on it if you just want to get something merged, but I'm not a fan of this structure or these methods being here/the way they're named.

Seems like we should have a client interface, perhaps with just these methods: createKVTable, and createWindowTable (and eventually I guess createSessionTable. The client interface could also have a clientType method that returns CASSANDRA or MONGO_DB and let us remove the enum entirely (assuming we also replace the config value with plain strings)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely agree that the current state of things isn't in tip top shape from abstractions, this was what I could throw together with the fewest lines of code changed while still improving the situation.

}
operations = (taskType == TaskType.GLOBAL)
? GlobalOperations.create(context, params)
: PartitionedOperations.create(name, context, params);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice simplification!

@agavra agavra merged commit 2e8032a into main Oct 12, 2023
1 check passed
@agavra agavra deleted the mongo branch October 12, 2023 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants