Load data to Vertica regular table through Kafka
/opt/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server testserver01:9092 --topic oggtopic
I|VERGET.TEST|2018-07-18 14:19:29.000000|2018-07-18T10:27:42.015000|00000000020016142178|9
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec orafeed_load --conf orafeed.conf
maverdb2=> select load_spec,parser,load_method from verget.stream_load_specs ;
load_spec | parser | load_method
--------------+-------------+-------------
orafeed_load | KafkaParser | TRICKLE
(1 row)
maverdb2=> drop table test;
DROP TABLE
maverdb2=> create table test ( op_type VARCHAR(20), tt VARCHAR(30), op_ts DATETIME, current_ts datetime, pos VARCHAR(100), id integer);
CREATE TABLE
vkconfig microbatch --delete --microbatch orafeed --conf orafeed.conf
vkconfig microbatch --create --microbatch orafeed --target-schema public --target-table test \
--add-source oggtopic --add-source-cluster orafeed_cluster --load-spec orafeed_load \
--conf orafeed.conf
vkconfig launch --conf orafeed.conf &
**** not getting any data in table test;
**** after change to flex table, it is getting data;
maverdb2=> drop table test;
DROP TABLE
maverdb2=>
maverdb2=> CREATE FLEX TABLE test();
CREATE TABLE
maverdb2=>
maverdb2=>
maverdb2=> select * from test;
identity | raw
--------------+-------------------------------------------------------------------------------------------------
1 | I|VERGET.TEST|2018-07-18 16:36:32.000000|2018-07-18T12:36:36.055000|00000000020016266898|21\012
(1 row)
**** question is why regular table is not getting data from Kafka???????? How to get data from Kafka to Vertica regular table???????????
Answers
I notice that you are using kafkaParser and by default it is considered as single message. You need specify the format of the data and use a parser such as JSON , Avro etc. Do you see any data in the table test_rejected?
https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/KafkaFunctions/KafkaParser.htm
maverdb2=> select * from test_rejected;
ERROR 4566: Relation "test_rejected" does not exist
I seems like this table doesn't exist.
My data is in text format with delimiter "|". Can I use KafkaJSONParser to parse this regular text format?
So it means that txn did not succeed. I sent you a personal message. please send me vertica.log to check what happened to that transaction
I saw Vertica is doing the following in the log
COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) PARSER KafkaParser() TRICKLE STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT
I run this command in vsql, and nothing was loaded. I played with following.
so it looks like if I remove PARSER KafkaParser(), it works. How to unattach the PARSER KafkaParser() from the load_spec? or what is the standard native parser for pure text?
maverdb2=> COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT;
Rows Loaded
2
(1 row)
maverdb2=> COPY "public"."test" SOURCE KafkaSource(stream='oggtopic|0|115', brokers='testserver01.production.local:9092', duration=interval '9871 milliseconds', eof_timeout=interval '1000 milliseconds', stop_on_eof=true, message_max_bytes=1048576 ) PARSER KafkaParser() STREAM NAME 'vkstream_verget_microbatch_500001_orafeed' NO COMMIT;
Rows Loaded
0
in the following link
https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm
" --parser parser name
Identifies a Vertica UDParser to use with a specified target. This parser is used within the COPY statement that the scheduler runs to load data. If you are using a Vertica native parser, the values supplied to the --parser-parameters option are passed through to the COPY statement.
Default Value:
KafkaParser"
How to use native parser in LoadSpec???
anyone has anyidea?
What KafkaParser did is just treat your whole kafka message as an string and dump it into a String Column in Vertica table.
it is fine for FlexTable since it is schema-less and treat the whole string as a VMap object, but it didn't match the schema in your regular table.
If you setup your reject table in your microbatch, you should get some reject-reason hints from there. (you didn't specify it while you create your microbatch).
The solution should be:
since you are not using standard format like JSON or Avro but your own customized format, you should use delimiter to parse your message.
An example should be something like
vkconfig load-spec --create --config-schema verget --load-spec kafka --parser Delimited --parser-parameters "delimiter '|' "
Sorry we don't have an good example in our docs yet, we are working on to add them.
Thanks
Here's one more thing you probably also need pay attention on, only Kafka(Json/Avro)Parser is Kafka message awareness, which means know each message's size and can parse them properly. Then parsers or delimiter will only get a data stream cascade all the kafka message. To allow KafkaSource combine with other parsers, we need a Filter followed by the KafkaSource to add a temp record terminator between kafka messages, usually we use KafkaInsertDelimiters(E'\n') as the terminator.
See example here in the document:
https://my.vertica.com/docs/9.1.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/ParsingCustomFormats.htm?TocPath=Integrating%20with%20Apache%20Kafka|_____8
Thanks a lot!! I will try it out and let you know.