Delimied file to parse with Kafka scheduler
Hi Team,
I need to parse a delimited file with kafka scheduler into vertica but i am getting error.i am using the below command to create the load-spec for ddelimited file.
Sample data
ABC123,1234#
XYZ12,1212#
PPPPP,121#
RRRR,7676#
Syntax i am using
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema vert123 --load-spec kafka_delim_test --parser fdelimitedpairparser (delimiter=',',recordTerminator = '#')
But its throws as syntax error...
0
Comments
See if the --parser-parameters option works for you:
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema vert123 --load-spec kafka_delim_test --parser fdelimitedpairparser --parser-parameters delimiter=',',recordTerminator='#';
See:
https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm
@opper20 - There is an open JIRA ticket with a request to add the standard Vertica parsers to Kafka. Currently they are not available so you need to use the --filters option.
See:
https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm
https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/ParsingCustomFormats.htm
HI Jim,
Thanks for the reply...I tried to run based on the suggestions from Vertica Documentation but i am not getting the appropriate result.
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema Vert123 --load-spec kafka_delim_test --filters KafkaInsertDelimiters(delimiter = ',',recordTerminator = '#') --username test1234 --password '####'
-bash: syntax error near unexpected token `('
Can you tell if i am parsing wrongly those values..
Thanks
Is your data a key value pair?
hadoop@skeswani:~$ cat data.txt
ABC123,33#XYZ234,12#
and is this how you want it to look?. i.e. each pair is a record. I am not sure fdelimitedpairparser is correct for the above data.
skeswani=> select * from tgt;
key | value
--------+------
ABC123 | 33
XYZ234 | 12
(2 rows)
How does you raw data look. can you send me a sample, using kafka-console-consumer.sh
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic vertica_topic --from-beginning
ABC123,33#XYZ234,12#
Based on the data provided, the row delimiter is not #, its E'#\n'
[skeswani@skeswani examples] [@:/]$ head /tmp/test.csv
value1,1#
value2,2#
value3,3#
value4,4#
value5,5#
value6,6#
value7,7#
value8,8#
value9,9#
value10,10#
[skeswani@skeswani examples] [@:/]$ pwd
/opt/vertica/sdk/examples
[skeswani@skeswani examples] [@:/]$ sudo mkdir build
[skeswani@skeswani examples] [@:/]$ sudo chmod a+w build/
[skeswani@skeswani examples] [@:/]$ make
...
...
[skeswani@skeswani examples] [@:/]$ vsql
Welcome to vsql, the Vertica Analytic Database interactive terminal.
Type: \h or \? for help with vsql commands
\g or terminate with semicolon to execute query
\q to quit
skeswani=> CREATE LIBRARY ExampleDelimitedParserLib AS '/opt/vertica/sdk/examples/build/ExampleDelimitedParser.so';
CREATE LIBRARY
skeswani=> CREATE PARSER ExampleDelimitedParser AS LANGUAGE 'C++' NAME 'DelimitedParserExampleFactory' LIBRARY ExampleDelimitedParserLib;
CREATE PARSER FUNCTION
skeswani=> drop table tgt;
DROP TABLE
skeswani=> create table tgt (key varchar, values varchar);
CREATE TABLE
skeswani=> copy tgt from '/tmp/test.csv' Parser ExampleDelimitedParser(delimiter=',' , record_terminator = E'\n');
Rows Loaded
(1 row)
-- not ideal but the \n should be replace with #
skeswani=> select * from tgt limit 1;
key | values
--------+--------
value1 | 1#
(1 row)
skeswani=> copy tgt from '/tmp/test.csv' Parser ExampleDelimitedParser(delimiter=',' , record_terminator = '#');
Rows Loaded
(1 row)
skeswani=> select * from tgt limit 1;
key | values
----------+--------
value10 | 10
(1 row)
skeswani=> truncate table tgt;
TRUNCATE TABLE
--- Now for the scheduler...
cat /tmp/test.csv | kafka-console-producer.sh --broker-list localhost:9092 --topic vertica_topic
/opt/vertica/packages/kafka/bin/vkconfig scheduler --drop --config-schema delim_test
/opt/vertica/packages/kafka/bin/vkconfig scheduler --add --config-schema delim_test --frame-duration '10 seconds'
/opt/vertica/packages/kafka/bin/vkconfig cluster --create --cluster StreamCluster1 --hosts localhost:9092 --config-schema delim_test
/opt/vertica/packages/kafka/bin/vkconfig source --create --config-schema delim_test --cluster StreamCluster1 --source vertica_topic --partitions 4
/opt/vertica/packages/kafka/bin/vkconfig target --create --config-schema delim_test --target-schema public --target-table tgt
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema delim_test --load-spec kafka --parser ExampleDelimitedParser --parser-parameters "delimiter=',',record_Terminator='#'"
/opt/vertica/packages/kafka/bin/vkconfig microbatch --create --config-schema delim_test --microbatch conversion --target-schema public --target-table tgt --rejection-schema public --rejection-table rej --load-spec kafka --add-source vertica_topic --add-source-cluster StreamCluster1
[skeswani@skeswani examples] [@:/]$ vsql -X -a -c "select * from tgt limit 1;"
select * from tgt limit 1;
key | values
-----+--------
(0 rows)
/opt/vertica/packages/kafka/bin/vkconfig launch --config-schema delim_test
[skeswani@skeswani examples] [@:/]$ vsql -X -a -c "select * from tgt limit 10;"
select * from tgt limit 10;
key | values
--------------+---------
value1 | 1
value10 | 10
value100 | 100
value1000 | 1000
value10000 | 10000
value100000 | 100000
value1000000 | 1000000
value100001 | 100001
value100002 | 100002
value100003 | 100003
(10 rows)
Thanks Sumeet...This really helps
Hi Sumeet,
Can you tell if i want to skip the header from the message how can i skip that using scheduler
FYI
If you want to use the default DELIMITED parser with scheduler, you need to specify as the load-spec parser, AND you need to use the filters argument to pass non standard delimiters, escape by....
For instance:
/opt/vertica/packages/kafka/bin/vkconfig load-spec --conf kafka.props --create --load-spec spec_1 --filters "FILTER KafkaInsertDelimiters(delimiter=E'\n') DELIMITER ',' " --load-method DIRECT --parser DELIMITED