-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingestion is not ignoring unknown feature in streaming source #99
Comments
To clarify, you mean it discards the whole row into the errors sink right? or are you saying it crashes the job? |
Copy from my PR comment. It is not expected behaviour to ignore unknown features. We need errors so that we know what's happening, we don't want to silently discard them. So what you seem to want is that instead of throwing the whole row into the errors pile, you want to accept as many features as possible and just throw errors for these features. The only place that needs to do that is the ConvertTypesDoFn. I suggest we should instead emit the row without these suspect features, while also emit an error row (to the errors tag) with just the suspect feature. Also, if these features proceed past ConvertTypesDoFn, it's a bug, not something we should ignore. So we definitely shouldn't ignore them later in the pipeline. We should be treating everything is valid from there and throwing exceptions if it's not. |
And from mine:
An example is if we had a feature creation pipeline creating rows of feature A,B and C, but we don't want to ingest B and C yet, so the ingestion job is run with only feature A declared in the schema. I don't think it's useful to log an error for every row as a result of this. |
Do we need an option to turn off throwing erros for this then? Because I expect it will be very common to forget to add it to the import spec and not realise why data isn't showing up |
I understand why you want the streaming and batch experiences to be the same though. At the moment the source decided what it selects. So it would be a setting for the Pubsub or Kafka sources |
So I'd suggest, rather than handling this at conversion we have a setting for ignoring unspecified fields or not in the sources. Nothing else should change, we should still throw errors in the convert DoFn and beyond |
Do you mean we add some thing like
And the behaviour is as follow: This will also be applicable to PubSub |
It currently discard the whole row into error sink |
Yep I think this is good Maybe discardUnknownFeatures reads better? I agree that:
Alternatively if it's false, you could implement throwing errors for the unknown features but still accepting the others. But start with what's easy I guess. I personally would prefer it to default to false, but happy to be out voted. |
It is probably worth emitting a special metric for this using
This however trusts that we wont receive random feature ids that pollute the metrics space too much. |
Regarding the default value. I think if you get new features that aren’t specced in the import they should be errors by default Basically default should make it easy to not know why your data isn’t showing up The issue is if you add a new feature and forget to add it to the import spec or forget to restart it. If we default to discarding it makes it hard to tell what you’ve forgotten to do. So I suggest we default to not discarding features, and new unknown features cause errors instead. It's very easy to add a flag when you know you want it. It's harder for new people to know. |
Expected Behavior
Ingestion should ignore feature ID in FeatureRow that was not specified in Import spec
Current Behavior
Ingestion tried to ingest the unknown feature and throw following exception:
"transform":"Convert feature types","message":"Unknown feature myentity.none.unknown_feature, spec was not initialized","stackTrace":"java.lang.IllegalArgumentException: Unknown feature myentity.none.unknown_feature, spec was not initialized\n\tat com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)\n\tat feast.ingestion.model.Specs.getFeatureSpec(Specs.java:148)\n\tat feast.ingestion.transform.fn.ConvertTypesDoFn.processElementImpl(ConvertTypesDoFn.java:44)\n\tat feast.ingestion.transform.fn.BaseFeatureDoFn.baseProcessElement(BaseFeatureDoFn.java:41)\n\tat feast.ingestion.transform.fn.ConvertTypesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)\n\tat org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)\n\tat org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)\n\tat org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)\n\tat org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:272)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)\n\tat org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:71)\n\tat
Steps to reproduce
Run ingestion with streaming source (PubSub / Kafka) and publish FeatureRow with unknown feature inside the stream.
The text was updated successfully, but these errors were encountered: