Kafka connect custom transforms to convert schema-less Json to Avro

I’m trying to build a system that reads json data(schema-less) from Kafka, converts it to avro and pushes it to s3.

I have been able to achieve the json to avro conversion using KStreams and KSQL. I was wondering if the same thing is possible using Kafka Connect’s custom transforms.

This is what I have tried so far:

public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
    private static final String PURPOSE = "transforming payload";
    public static final ConfigDef CONFIG_DEF = new ConfigDef();
    @Override
    public void configure(Map<String, ?> props) {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    @Override
    public R apply(R record) {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        avro_Schema updatedSchema = makeUpdatedSchema();

        return newRecord(record, updatedSchema);
    }

    private avro_Schema makeUpdatedSchema() {
        avro_Schema.Builder avro_record = avro_Schema.newBuilder()
                .setName("test")
                .setTry$(1);

        return avro_record.build();
    }

    protected Object operatingValue(R record) {
        return record.value();
    }

    protected R newRecord(R record, avro_Schema updatedSchema) {
        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
    }
}

Where avro_schema is the name of my schema specified in an avsc file.

I am not sure if this is the right way to do it, but the problem I am facing is that when the newRecord() function is being called, it expects updatedSchema to be of Schema type, but I’m providing it a custom avro_Schema type.

Also, the avro_record.build() that i’m saving into updatedSchema is not really the schema but the transformed record, itself. But I cannot pass just the record topic, key(=null) and the updatedRecord to the newRecord function. It expects schema and values separately.

My questions are:

  1. Is it even possible to convert json to avro using KafkaConnect and without KStreams or KSQL? – because both alternatives require an independent service to be setup.
  2. How do I just pass a custom avro schema to the newRecord function and then provide the data separately.

My apologies if this has already been answered, I did go through some other questions but none of them seemed to answer my doubts. Let me know if you need any other details. Thank you!

Answer

The KafkaConnect custom transformer only needs to add a schema to the incoming JSON. The sink property format.class=io.confluent.connect.s3.format.avro.AvroFormat will take care of the rest.

Without a schema, the record value is a Map and with a schema it becomes a struct. I had to modify my code as below:

    @Override
    public R apply(R record) {
        final Map<String,?> value = requireMap(record.value(),PURPOSE);
        Schema updatedSchema = makeUpdatedSchema();

        final Struct updatedValue = new Struct(updatedSchema);


        for (Field field : updatedSchema.fields()) {

            updatedValue.put(field.name(), value.get(field.name()));
        }


        return newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema() {
        final SchemaBuilder builder = SchemaBuilder.struct()
                .name("json_schema")
                .field("name",Schema.STRING_SCHEMA)
                .field("try",Schema.INT64_SCHEMA);

        return builder.build();
    }

Thanks @OneCricketeer for clarifying my doubts!

Leave a Reply

Your email address will not be published. Required fields are marked *