KafkaAVROParser problem
We are facing an issue using KafkaAVROParser, we are not able to correctly consume avro topics and exploit data in vertica.
For the purpose of debugging, we install kafka on the same server as vertica (to avoid network and ssl issues).
We install Confluent Platform 3.2.2 regarding official documentation (http://docs.confluent.io/current/quickstart.html).
Following example of §5 in documentation we produce an avro message using "kafka-avro-console-producer" :
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Messages :
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
Vertica Tests (Vertica Analytic Database v8.1.0-1) :
KafkaAVROParser : external_schema, flatten_arrays FALSE, flatten_maps FALSE
COPY "public"."test_avro_tgt"
SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
PARSER KafkaAVROParser(flatten_arrays=FALSE,flatten_maps=FALSE,external_schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}')
REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;
Rows Loaded
-------------
3select * from public.test_avro_tgt;
identity | raw
--------------+----------------------------------------------------------------------------------------------------------------------------------------------------
4 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1
5 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1
6 | \001\000\000\000\024\000\000\000\002\000\000\000\014\000\000\000\024\000\000\000myrecord\002\000\000\000\014\000\000\000\024\000\000\000__name__f1select maptostring(raw) from public.test_avro_tgt;
-[ RECORD 1 ]----------------------------------------------
maptostring | {
"name" : "myrecord",
"f1" : ""
}-[ RECORD 2 ]---------------------------------------------- maptostring | { "__name__" : "myrecord", "f1" : "" } -[ RECORD 3 ]---------------------------------------------- maptostring | { "__name__" : "myrecord", "f1" : "" }
KafkaAVROParser : raw external_schema as stored in schema registry
COPY "public"."test_avro_tgt"
SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
PARSER KafkaAVROParser(flatten_arrays=FALSE,flatten_maps=FALSE,external_schema='{"subject":"test_avro-value","version":1,"id":1,"schema":"{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}"}')
REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;ERROR 5861: Error calling setup() in User Function UDParser at [src/KafkaAvroParser.cpp:1105], error code: 0, message: Exception setting up parsing while processing partition: [Error: unable to construct valid schema from external_schema provided].
Binary
COPY "public"."test_avro_tgt"
SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
PARSER KafkaParser()
REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;Rows Loaded ------------- 3
select * from public.test_avro_tgt;
identity | raw
--------------+----------------------------------------------------------------------------------------------------------------------------------------------------
7 | \000\000\000\000\001\014value1
8 | \000\000\000\000\001\014value2
9 | \000\000\000\000\001\014value3select maptostring(raw) from public.test_avro_tgt;
-[ RECORD 4 ]----------------------------------------------
maptostring |
-[ RECORD 5 ]----------------------------------------------
maptostring |
-[ RECORD 6 ]----------------------------------------------
maptostring |KafkaAVROParser : schema_registry_url, schema_registry_subject and schema_registry_version
COPY "public"."test_avro_tgt"
SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
PARSER KafkaAVROParser(flatten_arrays=TRUE,flatten_maps=TRUE,schema_registry_url='localhost:8081',schema_registry_subject='test_avro-value',schema_registry_version='1')
REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;ERROR 5861: Error calling setup() in User Function UDParser at [src/KafkaAvroParser.cpp:1105], error code: 0, message: Exception setting up parsing while processing partition: [EOF reached].
KafkaAVROParser : schema_registry_url
COPY "public"."test_avro_tgt"
SOURCE KafkaSource(stream='test_avro|0|-2', brokers='CTLVS011:9092', duration=interval '9614 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 )
PARSER KafkaAVROParser(flatten_arrays=TRUE,flatten_maps=TRUE,schema_registry_url='localhost:8081')
REJECTED DATA AS TABLE "public"."test_avro_rej" TRICKLE;
Rows Loaded
-------------
0select * from public.test_avro_rej;
-[ RECORD 1 ]-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
node_name | v_vertica_node0003
file_name |
session_id | v_vertica_node0003-283043:0x21aac
transaction_id | 45035996283408265
statement_id | 3
batch_number | 0
row_number | 1
rejected_data |
value1
rejected_data_orig_length | 12
rejected_reason | Row [1] Error creating reader for avro, data/schema not supported.
When using schema registry messages must be written using Confluent Kafka Schema Registry format
Be sure that Schema Registry is up and running