Load data to Vertica regular table through Kafka

prayerhuangprayerhuang Registered User

/opt/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server testserver01:9092 --topic oggtopic
I|VERGET.TEST|2018-07-18 14:19:29.000000|2018-07-18T10:27:42.015000|00000000020016142178|9

/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec orafeed_load --conf orafeed.conf

maverdb2=> select load_spec,parser,load_method from verget.stream_load_specs ;
load_spec | parser | load_method
--------------+-------------+-------------
orafeed_load | KafkaParser | TRICKLE
(1 row)

maverdb2=> drop table test;
DROP TABLE
maverdb2=> create table test ( op_type VARCHAR(20), tt VARCHAR(30), op_ts DATETIME, current_ts datetime, pos VARCHAR(100), id integer);
CREATE TABLE

vkconfig microbatch --delete --microbatch orafeed --conf orafeed.conf

vkconfig microbatch --create --microbatch orafeed --target-schema public --target-table test \
--add-source oggtopic --add-source-cluster orafeed_cluster --load-spec orafeed_load \
--conf orafeed.conf

vkconfig launch --conf orafeed.conf &

**** not getting any data in table test;

**** after change to flex table, it is getting data;

maverdb2=> drop table test;
DROP TABLE
maverdb2=>
maverdb2=> CREATE FLEX TABLE test();
CREATE TABLE
maverdb2=>
maverdb2=>
maverdb2=> select * from test;
identity | raw
--------------+-------------------------------------------------------------------------------------------------
1 | I|VERGET.TEST|2018-07-18 16:36:32.000000|2018-07-18T12:36:36.055000|00000000020016266898|21\012
(1 row)

**** question is why regular table is not getting data from Kafka???????? How to get data from Kafka to Vertica regular table???????????

Answers

  • SruthiASruthiA Employee, Registered User, VerticaExpert
    edited July 18

    I notice that you are using kafkaParser and by default it is considered as single message. You need specify the format of the data and use a parser such as JSON , Avro etc. Do you see any data in the table test_rejected?

    https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/KafkaFunctions/KafkaParser.htm

  • prayerhuangprayerhuang Registered User

    maverdb2=> select * from test_rejected;
    ERROR 4566: Relation "test_rejected" does not exist

    I seems like this table doesn't exist.

    My data is in text format with delimiter "|". Can I use KafkaJSONParser to parse this regular text format?

  • SruthiASruthiA Employee, Registered User, VerticaExpert

    So it means that txn did not succeed. I sent you a personal message. please send me vertica.log to check what happened to that transaction

  • prayerhuangprayerhuang Registered User

    I saw Vertica is doing the following in the log

    COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) PARSER KafkaParser() TRICKLE STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT

    I run this command in vsql, and nothing was loaded. I played with following.

    so it looks like if I remove PARSER KafkaParser(), it works. How to unattach the PARSER KafkaParser() from the load_spec? or what is the standard native parser for pure text?

    maverdb2=> COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT;

    Rows Loaded
    2
    (1 row)

    maverdb2=> COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) PARSER KafkaParser() STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT;

    Rows Loaded
    0

  • prayerhuangprayerhuang Registered User

    in the following link

    https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm

    " --parser parser name

    Identifies a Vertica UDParser to use with a specified target. This parser is used within the COPY statement that the scheduler runs to load data. If you are using a Vertica native parser, the values supplied to the --parser-parameters option are passed through to the COPY statement.
    Default Value:
    KafkaParser"

    How to use native parser in LoadSpec???

  • prayerhuangprayerhuang Registered User

    anyone has anyidea?

  • YangXuYangXu Employee, Registered User

    What KafkaParser did is just treat your whole kafka message as an string and dump it into a String Column in Vertica table.
    it is fine for FlexTable since it is schema-less and treat the whole string as a VMap object, but it didn't match the schema in your regular table.
    If you setup your reject table in your microbatch, you should get some reject-reason hints from there. (you didn't specify it while you create your microbatch).

    The solution should be:
    since you are not using standard format like JSON or Avro but your own customized format, you should use delimiter to parse your message.
    An example should be something like
    vkconfig load-spec --create --config-schema verget --load-spec kafka --parser Delimited --parser-parameters "delimiter '|' "

    Sorry we don't have an good example in our docs yet, we are working on to add them.

    Thanks

  • YangXuYangXu Employee, Registered User

    Here's one more thing you probably also need pay attention on, only Kafka(Json/Avro)Parser is Kafka message awareness, which means know each message's size and can parse them properly. Then parsers or delimiter will only get a data stream cascade all the kafka message. To allow KafkaSource combine with other parsers, we need a Filter followed by the KafkaSource to add a temp record terminator between kafka messages, usually we use KafkaInsertDelimiters(E'\n') as the terminator.

    See example here in the document:
    https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/ParsingCustomFormats.htm?TocPath=Integrating%20with%20Apache%20Kafka|_____8

  • prayerhuangprayerhuang Registered User

    Thanks a lot!! I will try it out and let you know.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file