Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Loader cannot start with InitialPosition = AT_TIMESTAMP #267

Closed
istreeter opened this issue Aug 3, 2022 · 0 comments
Closed

Fix: Loader cannot start with InitialPosition = AT_TIMESTAMP #267

istreeter opened this issue Aug 3, 2022 · 0 comments

Comments

@istreeter
Copy link
Contributor

The S3 loader seems to allow loading from a given point in time in the kinesis stream. The configuration to get this working is:

  "position": {
    "at_timestamp": {
      "timestamp": "2022-06-06T06:06:06Z"
    }
  }

But we get an exception like:

	2022-08-03T15:22:36.009+01:00	Exception in thread "main" java.lang.IllegalArgumentException: Invalid InitialPosition: AT_TIMESTAMP
	2022-08-03T15:22:36.009+01:00	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStreamExtended.java:68)
	2022-08-03T15:22:36.009+01:00	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration.<init>(KinesisClientLibConfiguration.java:708)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.connector.KinesisSourceExecutor.getKCLConfig(KinesisSourceExecutor.scala:93)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.connector.KinesisSourceExecutor.initialize(KinesisSourceExecutor.scala:121)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.connector.KinesisSourceExecutor.<init>(KinesisSourceExecutor.scala:57)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.S3Loader.run(S3Loader.scala:59)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.S3Loader.run$(S3Loader.scala:41)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.S3Loader$.run(S3Loader.scala:127)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.Main$.$anonfun$main$1(Main.scala:51)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.Main$.$anonfun$main$1$adapted(Main.scala:51)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.MainPlatform.withConfig(Main.scala:38)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.MainPlatform.withConfig$(Main.scala:33)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.Main$.withConfig(Main.scala:49)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.Main$.main(Main.scala:51)
	2022-08-03T15:22:36.009+01:00	at com.snowplowanalytics.s3.loader.Main.main(Main.scala)

The exception happens because of how we pass the initial position into the constructor... but the constructor (not controlled by us) only allows latest or trim_horizon because of this line here.

The solution is to always pass InitialPositionInStream.LATEST into the constructor, which is OK because we always amend the initial position immediately after calling the constructor.

@spenes spenes closed this as completed in ef957d9 Sep 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant