Loading AVRO data using KAFKA Parser



I am trying to test loading data using the KafkaAVROParser. I have configured the topic with the following parameters:


parser-parameters=flatten_maps=TRUE,flatten_arrays=TRUE,codec='default',external_schema='{ "type": "record", "namespace": "org.foo", "name": "test", "fields": [ { "name":
"dim", "type": "string" }] }'



I am sending the data as follows to the Kafka topic: 


DatumWriter<GenericRecord>writer = new GenericDatumWriter<GenericRecord>(schema);
GenericRecord record = new GenericData.Record(schema);
record.put("dim", new Utf8("dim"));
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
byte[] msgBytes = out.toByteArray();

KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("test_topic_2",msgBytes);



Whenever I send data to the kafka topic I get the following error in the Vertica rejection table: 


Error creating reader for avro, data/schema not supported.
Note: Codec supported is avro null or snappy
When using external_schema messages cannot contain header(AVRO_MAGIC, AVRO_CODEC_KEY,AVRO_SCHEMA_KEY) and/or schema



I must have misconfigured the schema but cannot figure out what the problem is. 


If someone has an example of how to configure the KafkaAVROParser with an external schema that would be a great help. 






  • Hi Jim


    Actual KafkaAvroParser works with messages that contain Avro metadata (as Avro DafaFileWriter works https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Serializing-N101DE )


    We are working on the enhancement that you mention (messages containing just the Avro data/datum) for future release 




  • Hi Cornelio,


    I tried the similar way as Jim to provide external schema and generated Avro message without metadata. I got different error:

    Row [1] Exception: while parsing the Avro message. Object count exception, Avro record is not well formed or Kafka message has more than one Avro record.


    I saw in the attached document on integrating Kafka, there's a way to provide external schema. Are you sure it's actually not supported? Or just there's some limitation for this feautre?


    Besides, after reading your reply, I also tried the way you suggested. As a result, there's no error, but only rows with NULL column values are inserted into the target table. Do you know why this happend?


    P.S. I didn't use flexible table but use normal table with accurate column definitions. When I tried flexible table, I can see the message packet(binary format) inserted into the table. However, I can find no way to extract the attributes from the __raw__ column of the flexible table.





  • Hi Allen


    external_schema is supported if you write your messages as in "https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Serializing+and+deserializing+without+code+generation" , note that this way of writing will include schema, so schema must be removed in order to use "external_schema" parameter.


    Regarding usage of real table, we are adding a "__name__" key (flex) or "__name__" column for real tables for each record in your message, did you add this "__name__" column, maybe thats why you are getting the NULL values (BTW no rejections?).

    E.g. the real table for the Avro schema in the example https://avro.apache.org/docs/1.7.7/gettingstartedjava.html#Defining+a+schema 

    should be

    CREATE TABLE example(__name__ varchar, name varchar, favorite_number int, favorite_color varchar);



    To take a look to flex tables in a more readable fasion you can use flex maptoString

    select maptoString(__raw__) from myFlexTable;

  • Hi Cornelio,


    I prefer to use external_schema because in this way each message will be short. But how to remove schema from Avro written file? It's in binary format, and unreadable, so I don't know what part can be removed.


    I tried the below ways but failed:

    1. Only remove the schema string I can read. It seems there're still MAGIC and metadata.

    2. remove the first line.

    3. In code, after dataFileWriter.create(schema, stream), invoke stream.reset(), because I read the source code of Avro, the create method output MAGIC, schema and metadata.


    But still get the error:

    Row [1] Error: could not load rows from message
    When using external_schema messages cannot contain header(AVRO_MAGIC, AVRO_CODEC_KEY,AVRO_SCHEMA_KEY) and/or schema


    Can you help?




  • Hi Allen


    Normally, Avro writes 

    "Obj MAGIC codec {schema}nullDATA", so you may still have avro MAGIC,codec,schema or null data there.

    After the schema there is a null that needs to be removed.




    Unfortunately there is no "easy" way to do it and solutions (as manually remove part of the file) can be tricky.


    We are using AVRO´s DatumWriter/DataFileWriter approach but will incorporate just the DatumWriter approach (as in your first attempt) for future release





  • Hi Cornelio, 


    Thanks for your reply. After removing "Obj MAGIC codec {schema}null", the message can be loaded into Vertica. But according to the Java source code of Avro, after writing null after schema, it writes 16 random bytes. I think it works as a separator between metadata and the real data. If external schema is provided, I think these 16 bytes is useless and shouldn't be required.


    For NULL values inserted into table, it's due to it's case-sensitive when mapping fields in Avro message to table columns. For example of my case, the field in Avro message is iipmsSerialNumber, but in the table column definition, I used upper case.






  • Hi Cornelio, 


    I have two more questions on the Avro Kafka parser.

        1. There's a time column which is of integer type in the source system, epoch time as nb of ms. We want to transform it to timestamp column in our table for future use. Is it supported now? Can it be enhanced in future release such as V7.2.2?

        2. In our table, we have a update_timestamp column with default value GETDATE(). This column is not provided in the Kafka message. In my test, I found that Vertica will always insert NULL for this column. But actually, we want set current time to it to record the time when it's inserted. Is there a way to fulfil the requirement? Can it be enhanced in future release such as V7.2.2? For example, if default value is provided, don't insert NULL for the column.





  • Hi Allen


    FlexAvroParser and KafkaAvroParser do not have a function to handle long/int or epoc conversion to Date/Time Vertica column.

    Currently FlexAvroParser (and also KafkaAvroParser) can COPY dates (DateOID, TimeOID TimestampOID, TimestampTzOID , TimeTzOID) if they were written as Avro String.

    Avro Schema (writing Avro file/message using this schema, note time filed as string type, e.g. "2016-02-11 11:06:09.287")

    --Copy using FlexAvroParser
    vertica=> create table test(event_name varchar(30),time timestamptz);

    vertica=> copy test from 'Events.avro' parser favroparser(flatten_arrays=true);
     Rows Loaded
    (1 row)

    vertica=> select * from test;
        event_name | time
     wsArvAfrer_ase-1 | 2016-03-14 11:06:09.287-04
    (1 row)


    So, to be able to handle/convert epoc or int/long representation to a date/time, parsers will need an intermediate step of converting such values into the desired format. It is an interesting/usable case so it will be studied with a view to implementing it in a future version if possible.

    One more test using  INSERT 
    --Create table with time column as TIMESTAMPTZ
    vertica=> create table test(event_name varchar(30),time timestamptz);

    --Try copying as Varchar so varchar contains the actual timestamptz
    vertica=> insert into test values('event 1','2016-03-14 11:06:09.287');
    (1 row)

    vertica=> select * from test;
     event_name | time
     event 1 | 2016-03-14 11:06:09.287-04
    (1 row)

    --Check it is actually a date
    vertica=> SELECT EXTRACT(YEAR FROM (SELECT time from test));
    (1 row)

  • Hi Cornelio,


    As I understand, Vertica itself can implicitly convert string to timestamp according to some internal format, so I can present the timestamp in the specific format of a string value in the Avro message, right?


    For my question 2 in my last post, we want to record the time when the message is inserted into table in the update_timestamp column which isn't presented in the message, is there a way to solve it? We can't present the update_timestamp in message, because there may be a deplay before the message is inserted into database. Or maybe due to some reason, we have to reconsume a message that is discarded. In this case, the update_timestamp presented in message may be very different with the actual update_timestamp.


    We try to give update_timestamp column a default value GETDATE(). But in my test, I found that Vertica will always insert NULL for this column. 






Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file