C# confluent kafka problem with avro serialization

I’m using docker to run kafka and other services from https://github.com/confluentinc/cp-all-in-one with confluent nuget packages for kafka, avro and schemaRegistry in my test project.

If it goes to sending json messages I have no problem till now, but I’m struggling with sending avro serialized messages.

I saw https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific example and I tried to do it the same way but eventually I get an exception like below:

Local: Value serialization error
at Confluent.Kafka.Producer2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter1.GetResult() at Kafka_producer.KafkaService.d__10.MoveNext() in C:Userslu95ebsourcereposKafka_playgroundKafka producerKafkaService.cs:line 126

with inner exception

Object reference not set to an instance of an object.
at Confluent.SchemaRegistry.Serdes.SpecificSerializerImpl1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer1.d__6.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Confluent.Kafka.Producer`2.d__52.MoveNext()

Here’s my SpecificRecord class

public class UserInfo : ISpecificRecord
{
    public string Name { get; set; }
    public int[] Numbers { get; set; }

    public Schema Schema => Schema.Parse(@"
        {
          ""name"": ""UserInfo"",
          ""type"": ""record"",
          ""namespace"": ""kafka"",
          ""fields"": [
            {
              ""name"": ""Name"",
              ""type"": ""string""
            },
            {
              ""name"": ""Numbers"",
              ""type"": {
                ""type"": ""array"",
                ""items"": ""int""
              }
            }
          ]
        }
        ");

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return Name;
            case 1: return Numbers;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
        }
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: Name = (string)fieldValue; break;
            case 1: Numbers = (int[])fieldValue; break;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
        }
    }
}

And method used to send message

private async Task SendSpecificRecord(UserInfo userInfo)
    {
        using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
        using (var producer =
            new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
                .Build())
        {

            var message = new Message<string, UserInfo>
            {
                Key = userInfo.Name,
                Value = userInfo
            };


            await producer.ProduceAsync(SpecificTopic, message);
        }
    }

KafkaService.cs:line 126 is await producer.ProduceAsync(SpecificTopic, message);

Like I wrote at the start, I have no problems with schemaRegistry- I have schemas registered and they work properly for json, I have no problems with topics, broker, consumer or whatever.

I will be grateful if anyone can point me what I’m doing wrong. Thank you in advance.

Answer

If anybody is curious about the solution (I can’t imagine how someone could be ;)) then I wrote ‘custom’ avro serializer and deserializer and works like a charm.

public class CustomAvroSerializer<T> : IAsyncSerializer<T>
    where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
        {
            using (var ms = new MemoryStream())
            {
                var enc = new BinaryEncoder(ms);
                var writer = new SpecificDefaultWriter(data.Schema);
                writer.Write(data, enc);
                return ms.ToArray();
            }
        });
    }
}

public class CustomAvroDeserializer<T> : IDeserializer<T>
    where T : class, ISpecificRecord
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        using (var ms = new MemoryStream(data.ToArray()))
        {
            var dec = new BinaryDecoder(ms);
            var regenObj = (T)Activator.CreateInstance(typeof(T));

            var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
            reader.Read(regenObj, dec);
            return regenObj;
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *