Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve schemas in parallel #85

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.Sync
import cats.effect.Async
import cats.effect.implicits._
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
Expand Down Expand Up @@ -46,7 +47,7 @@ object NonAtomicFields {
failure: FailureDetails.LoaderIgluError
)

def resolveTypes[F[_]: Sync: RegistryLookup](
def resolveTypes[F[_]: Async: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]],
filterCriteria: List[SchemaCriterion]
Expand All @@ -61,7 +62,7 @@ object NonAtomicFields {
// Remove whole schema family if there is no subversion left after filtering
subVersions.nonEmpty
}
.traverse { case (tabledEntity, subVersions) =>
.parTraverse { case (tabledEntity, subVersions) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will generate a bunch of requests to the Iglu server at once. But I think that this is fine as each instance should be requesting only a few schemas at once, and even if we have tens of instances, it is very unlikely that all of them resolve the schemas at the exact same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a very good point. It could be many 10s of schemas. I think we get best separation of concerns if stuff like that to be controlled by the connection pool in the HTTP client. Blaze by default allows 256 concurrent connections per server, and that is probably a bit too large for us.

I might make this change in combination with adding a default common-streams HTTP client where we override some config options like max connections per server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be many 10s of schemas

If we have only one loader requesting that that's fine (e.g. one collector instance can handle constantly more than 1000 requests / second). If we have tens of loaders requesting it, that's a little bit scarier, but that's still probably fine, it's unlikely to happen, and there is your new exitOnMissingSchema feature (which I think is a great idea).

I think we get best separation of concerns if stuff like that to be controlled by the connection pool in the HTTP client.

I very much agree ! This code shouldn't worry about the under-the-hood HTTP client

I might make this change in combination with adding a default common-streams HTTP client where we override some config options like max connections per server

👌

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #87 to address the http client connection pool.

SchemaProvider
.fetchSchemasWithSameModel(resolver, TabledEntity.toSchemaKey(tabledEntity, subVersions.max))
.map(TypedTabledEntity.build(tabledEntity, subVersions, _))
Expand Down
Loading