[Question] How do you use Confluent Schema Registry to deserialize Avro from Kafka? #592
-
I'm working on a proof of concept using Kafka, Confluent Schema Registry, and .NET for Apache Spark. I'm able to read Avro serialized bytes from Kafka and print them to the console. Now I'm trying to integrate the Schema Registry to make sense of the bytes. I'm including the What I can't figure out is exactly how to invoke the using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
namespace ClickStreamConsumer
{
class Program
{
private static ISchemaRegistryClient _schemaRegistry;
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.Error.WriteLine(
"Usage: ClickEventConsumer " +
"<bootstrap-servers> <schema-registry-url> <topic>");
Environment.Exit(1);
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topic = args[2];
var schemaRegistryParams = new Dictionary<string, string>
{
{ "schema.registry.url", schemaRegistryUrl }
};
_schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryParams);
var subject = _schemaRegistry.ConstructValueSubjectName(topic);
var schema = await _schemaRegistry.GetLatestSchemaAsync(subject);
var spark = SparkSession
.Builder()
.AppName("ClickEventConsumer")
.GetOrCreate();
var avro = spark
.ReadStream()
.Format("kafka")
.Option("kafka.bootstrap.servers", bootstrapServers)
.Option("subscribe", topic)
.Option("startingOffsets", "earliest")
.Load();
// How do I pass in the Avro schema?
// Is there a better way to do this?
var something = avro
.Select(CallUDF("from_avro", avro["value"]));
var console = something
.WriteStream()
.Format("console")
.Start();
console.AwaitTermination();
spark.Stop();
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 14 comments
-
Meanwhile, you can try the following: static void Column FromAvro(Column data, String jsonFormatSchema)
{
return new Column(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.avro",
"from_avro",
data,
jsonFormatSchema));
} And make sure add https://github.com/aelij/IgnoresAccessChecksToGenerator in your project to access internal classes. |
Beta Was this translation helpful? Give feedback.
-
Thanks this seems to work but now I've uncovered other errors.
Not sure where this exception is coming from. My hunch is that it might have something to do with the fact that the first 4 bytes are the Id of the schema in the Schema Registry, which can throw off deserialization when using a vanilla Avro deserializer. Is there a way to do the deserialization in .NET rather than in Scala? If I could get the bytes in .NET then I could instantiate a Confluent |
Beta Was this translation helpful? Give feedback.
-
which line of the code above is causing this exception? |
Beta Was this translation helpful? Give feedback.
-
I introduced the code you recommended. Presumably, it's coming from there. using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
namespace ClickStreamConsumer
{
class Program
{
private static ISchemaRegistryClient _schemaRegistry;
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.Error.WriteLine(
"Usage: ClickEventConsumer " +
"<bootstrap-servers> <schema-registry-url> <topic>");
Environment.Exit(1);
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topic = args[2];
var schemaRegistryParams = new Dictionary<string, string>
{
{ "schema.registry.url", schemaRegistryUrl }
};
_schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryParams);
var subject = _schemaRegistry.ConstructValueSubjectName(topic);
var schema = await _schemaRegistry.GetLatestSchemaAsync(subject);
Console.WriteLine(schema.SchemaString);
var spark = SparkSession
.Builder()
.AppName("ClickEventConsumer")
.GetOrCreate();
var avro = spark
.ReadStream()
.Format("kafka")
.Option("kafka.bootstrap.servers", bootstrapServers)
.Option("subscribe", topic)
.Option("startingOffsets", "earliest")
.Load();
var something = avro
.Select(FromAvro(avro["key"], "string").As("key"),
FromAvro(avro["value"], schema.SchemaString).As("value"));
var console = something
.WriteStream()
.Format("console")
.Start();
console.AwaitTermination();
spark.Stop();
}
private static Column FromAvro(Column avro, string schema)
{
if (avro is null)
{
throw new ArgumentNullException("avro");
}
if (string.IsNullOrWhiteSpace(schema))
{
throw new ArgumentNullException("schema");
}
return new Column(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.avro",
"from_avro",
avro,
schema));
}
}
} |
Beta Was this translation helpful? Give feedback.
-
This documentation seems to indicate that I can just pass the schema registry URL into the using System;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql;
namespace ClickStreamConsumer
{
class Program
{
static void Main(string[] args)
{
if (args.Length != 3)
{
Console.Error.WriteLine(
"Usage: ClickEventConsumer " +
"<bootstrap-servers> <schema-registry-url> <topic>");
Environment.Exit(1);
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topic = args[2];
var spark = SparkSession
.Builder()
.AppName("ClickEventConsumer")
.GetOrCreate();
var avro = spark
.ReadStream()
.Format("kafka")
.Option("kafka.bootstrap.servers", bootstrapServers)
.Option("subscribe", topic)
.Option("startingOffsets", "earliest")
.Load();
var something = avro
.Select(FromAvro(avro["value"], topic + "-value", schemaRegistryUrl).As("value"));
var console = something
.WriteStream()
.Format("console")
.Start();
console.AwaitTermination();
spark.Stop();
}
private static Column FromAvro(Column avro, string subject, string schemaRegistryUrl)
{
return new Column(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.avro",
"from_avro",
avro,
subject,
schemaRegistryUrl));
}
}
} However, when I do it I just get more cryptic exceptions.
|
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
I see. Looks like this other version of |
Beta Was this translation helpful? Give feedback.
-
I want to be able to do exactly what this is doing but ideally in .NET instead of Scala. |
Beta Was this translation helpful? Give feedback.
-
For |
Beta Was this translation helpful? Give feedback.
-
It's all running locally using Docker Compose. Here's the repo I'm working on. There are a bunch of things in here that are a work in progress. To see the error you're interested in, clone the repo and do the following.
|
Beta Was this translation helpful? Give feedback.
-
Hmm.., is it possible to simplify the repro step such that it doesn't involve kafka, etc.? For example, read a file and call Also, can you share the full log for the run? |
Beta Was this translation helpful? Give feedback.
-
The confluent platform is part of the repro steps. Deserializing Confluent Avro data is slightly different than deserializing a vanilla Avro data. Regular Avro is just the raw bytes, but Confluent Avro prepends the Id of the schema in the schema registry. So I suspect that even if we figure out how to use Full Logs
|
Beta Was this translation helpful? Give feedback.
-
Do you need to fix this?
|
Beta Was this translation helpful? Give feedback.
-
Ah. Sort of. The schema registry is still spinning up when my app was trying to look up the schema. Docker Compose isn't the best at managing dependencies and my Spark app isn't super robust yet. Just let the Spark app fall down and start it again. |
Beta Was this translation helpful? Give feedback.
from_avro
is a Scala side function, so you don't need to create a UDF. Unfortunately, that function is not currently exposed in .NET for Apache Spark.Meanwhile, you can try the following:
And make sure add https://github.com/aelij/IgnoresAccessChecksToGenerator in your project to access internal classes.