KafkaAVROParser problem

We are facing an issue using KafkaAVROParser, we are not able to correctly consume avro topics and exploit data in vertica.
For the purpose of debugging, we install kafka on the same server as vertica (to avoid network and ssl issues).
We install Confluent Platform 3.2.2 regarding official documentation (http://docs.confluent.io/current/quickstart.html).
Following example of §5 in documentation we produce an avro message using "kafka-avro-console-producer" :
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Messages :
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

Vertica Tests (Vertica Analytic Database v8.1.0-1) :

  • KafkaAVROParser : external_schema, flatten_arrays FALSE, flatten_maps FALSE
    COPY "public"."test_avro_tgt"
    SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
    PARSER KafkaAVROParser(flatten_arrays=FALSE,flatten_maps=FALSE,external_schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}')
    REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;
    Rows Loaded
    -------------
    3

    select * from public.test_avro_tgt;
    identity | raw
    --------------+----------------------------------------------------------------------------------------------------------------------------------------------------
    4 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1
    5 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1
    6 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1

    select maptostring(raw) from public.test_avro_tgt;
    -[ RECORD 1 ]----------------------------------------------
    maptostring | {
    "name" : "myrecord",
    "f1" : ""
    }

    -[ RECORD 2 ]----------------------------------------------
    maptostring | {
       "__name__" : "myrecord",
       "f1" : ""
    }
    
    -[ RECORD 3 ]----------------------------------------------
    maptostring | {
       "__name__" : "myrecord",
       "f1" : ""
    }
    
  • KafkaAVROParser : raw external_schema as stored in schema registry
    COPY "public"."test_avro_tgt"
    SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
    PARSER KafkaAVROParser(flatten_arrays=FALSE,flatten_maps=FALSE,external_schema='{"subject":"test_avro-value","version":1,"id":1,"schema":"{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}"}')
    REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;

    ERROR 5861:  Error calling setup() in User Function UDParser at [src/KafkaAvroParser.cpp:1105], error code: 0, message: Exception setting up parsing while processing partition: [Error: unable to construct valid schema from external_schema provided].
    
  • Binary
    COPY "public"."test_avro_tgt"
    SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
    PARSER KafkaParser()
    REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;

     Rows Loaded
    -------------
               3
    

    select * from public.test_avro_tgt;
    identity | raw
    --------------+----------------------------------------------------------------------------------------------------------------------------------------------------
    7 | \000\000\000\000\001\014value1
    8 | \000\000\000\000\001\014value2
    9 | \000\000\000\000\001\014value3

    select maptostring(raw) from public.test_avro_tgt;
    -[ RECORD 4 ]----------------------------------------------
    maptostring |
    -[ RECORD 5 ]----------------------------------------------
    maptostring |
    -[ RECORD 6 ]----------------------------------------------
    maptostring |

  • KafkaAVROParser : schema_registry_url, schema_registry_subject and schema_registry_version
    COPY "public"."test_avro_tgt"
    SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
    PARSER KafkaAVROParser(flatten_arrays=TRUE,flatten_maps=TRUE,schema_registry_url='localhost:8081',schema_registry_subject='test_avro-value',schema_registry_version='1')
    REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;

    ERROR 5861:  Error calling setup() in User Function UDParser at [src/KafkaAvroParser.cpp:1105], error code: 0, message: Exception setting up parsing while processing partition: [EOF reached].
    
  • KafkaAVROParser : schema_registry_url
    COPY "public"."test_avro_tgt"
    SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
    PARSER KafkaAVROParser(flatten_arrays=TRUE,flatten_maps=TRUE,schema_registry_url='localhost:8081')
    REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;
    Rows Loaded
    -------------
    0

    select * from public.test_avro_rej;
    -[ RECORD 1 ]-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    node_name | v_vertica_node0003
    file_name |
    session_id | v_vertica_node0003-283043:0x21aac
    transaction_id | 45035996283408265
    statement_id | 3
    batch_number | 0
    row_number | 1
    rejected_data |
    value1
    rejected_data_orig_length | 12
    rejected_reason | Row [1] Error creating reader for avro, data/schema not supported.
    When using schema registry messages must be written using Confluent Kafka Schema Registry format
    Be sure that Schema Registry is up and running

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file