Please take this survey to help us learn more about how you use third party tools. Your input is greatly appreciated!

Delimied file to parse with Kafka scheduler

Hi Team,

I need to parse a delimited file with kafka scheduler into vertica but i am getting error.i am using the below command to create the load-spec for ddelimited file.

Sample data

ABC123,1234#
XYZ12,1212#
PPPPP,121#
RRRR,7676#

Syntax i am using

/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema vert123 --load-spec kafka_delim_test --parser fdelimitedpairparser (delimiter=',',recordTerminator = '#')

But its throws as syntax error...

Comments

  • Jim_KnicelyJim_Knicely Administrator
    edited November 2017

    See if the --parser-parameters option works for you:

    /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema vert123 --load-spec kafka_delim_test --parser fdelimitedpairparser --parser-parameters delimiter=',',recordTerminator='#';

    See:
    https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm

  • Jim_KnicelyJim_Knicely Administrator
    edited November 2017

    @opper20 - There is an open JIRA ticket with a request to add the standard Vertica parsers to Kafka. Currently they are not available so you need to use the --filters option.

    See:

    https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/UtilityOptions/LoadSpecUtilityOptions.htm

    https://my.vertica.com/docs/9.0.x/HTML/index.htm#Authoring/KafkaIntegrationGuide/ParsingCustomFormats.htm

  • HI Jim,

    Thanks for the reply...I tried to run based on the suggestions from Vertica Documentation but i am not getting the appropriate result.

    /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema Vert123 --load-spec kafka_delim_test --filters KafkaInsertDelimiters(delimiter = ',',recordTerminator = '#') --username test1234 --password '####'
    -bash: syntax error near unexpected token `('

    Can you tell if i am parsing wrongly those values..

    Thanks

  • Is your data a key value pair?

    [email protected]:~$ cat data.txt
    ABC123,33#XYZ234,12#

    and is this how you want it to look?. i.e. each pair is a record. I am not sure fdelimitedpairparser is correct for the above data.

    skeswani=> select * from tgt;
    key | value
    --------+------
    ABC123 | 33
    XYZ234 | 12
    (2 rows)

    How does you raw data look. can you send me a sample, using kafka-console-consumer.sh

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic vertica_topic --from-beginning
    ABC123,33#XYZ234,12#

  • Based on the data provided, the row delimiter is not #, its E'#\n'

  • [[email protected] examples] [@:/]$ head /tmp/test.csv
    value1,1#
    value2,2#
    value3,3#
    value4,4#
    value5,5#
    value6,6#
    value7,7#
    value8,8#
    value9,9#
    value10,10#

    [[email protected] examples] [@:/]$ pwd
    /opt/vertica/sdk/examples
    [[email protected] examples] [@:/]$ sudo mkdir build
    [[email protected] examples] [@:/]$ sudo chmod a+w build/
    [[email protected] examples] [@:/]$ make
    ...
    ...

    [[email protected] examples] [@:/]$ vsql

    Welcome to vsql, the Vertica Analytic Database interactive terminal.

    Type: \h or \? for help with vsql commands
    \g or terminate with semicolon to execute query
    \q to quit

    skeswani=> CREATE LIBRARY ExampleDelimitedParserLib AS '/opt/vertica/sdk/examples/build/ExampleDelimitedParser.so';
    CREATE LIBRARY
    skeswani=> CREATE PARSER ExampleDelimitedParser AS LANGUAGE 'C++' NAME 'DelimitedParserExampleFactory' LIBRARY ExampleDelimitedParserLib;
    CREATE PARSER FUNCTION
    skeswani=> drop table tgt;
    DROP TABLE
    skeswani=> create table tgt (key varchar, values varchar);
    CREATE TABLE
    skeswani=> copy tgt from '/tmp/test.csv' Parser ExampleDelimitedParser(delimiter=',' , record_terminator = E'\n');

    Rows Loaded

     1000000
    

    (1 row)
    -- not ideal but the \n should be replace with #
    skeswani=> select * from tgt limit 1;
    key | values
    --------+--------
    value1 | 1#
    (1 row)

    skeswani=> copy tgt from '/tmp/test.csv' Parser ExampleDelimitedParser(delimiter=',' , record_terminator = '#');

    Rows Loaded

     1000000
    

    (1 row)

    skeswani=> select * from tgt limit 1;
    key | values
    ----------+--------

    value10 | 10
    (1 row)

    skeswani=> truncate table tgt;
    TRUNCATE TABLE

    --- Now for the scheduler...

    cat /tmp/test.csv | kafka-console-producer.sh --broker-list localhost:9092 --topic vertica_topic

    /opt/vertica/packages/kafka/bin/vkconfig scheduler --drop --config-schema delim_test
    /opt/vertica/packages/kafka/bin/vkconfig scheduler --add --config-schema delim_test --frame-duration '10 seconds'
    /opt/vertica/packages/kafka/bin/vkconfig cluster --create --cluster StreamCluster1 --hosts localhost:9092 --config-schema delim_test
    /opt/vertica/packages/kafka/bin/vkconfig source --create --config-schema delim_test --cluster StreamCluster1 --source vertica_topic --partitions 4
    /opt/vertica/packages/kafka/bin/vkconfig target --create --config-schema delim_test --target-schema public --target-table tgt

    /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --config-schema delim_test --load-spec kafka --parser ExampleDelimitedParser --parser-parameters "delimiter=',',record_Terminator='#'"

    /opt/vertica/packages/kafka/bin/vkconfig microbatch --create --config-schema delim_test --microbatch conversion --target-schema public --target-table tgt --rejection-schema public --rejection-table rej --load-spec kafka --add-source vertica_topic --add-source-cluster StreamCluster1

    [[email protected] examples] [@:/]$ vsql -X -a -c "select * from tgt limit 1;"
    select * from tgt limit 1;
    key | values
    -----+--------
    (0 rows)

    /opt/vertica/packages/kafka/bin/vkconfig launch --config-schema delim_test

    [[email protected] examples] [@:/]$ vsql -X -a -c "select * from tgt limit 10;"
    select * from tgt limit 10;
    key | values
    --------------+---------
    value1 | 1
    value10 | 10
    value100 | 100
    value1000 | 1000
    value10000 | 10000
    value100000 | 100000
    value1000000 | 1000000
    value100001 | 100001
    value100002 | 100002
    value100003 | 100003
    (10 rows)

  • Thanks Sumeet...This really helps

  • Hi Sumeet,

    Can you tell if i want to skip the header from the message how can i skip that using scheduler

  • SergeBSergeB Employee

    FYI
    If you want to use the default DELIMITED parser with scheduler, you need to specify as the load-spec parser, AND you need to use the filters argument to pass non standard delimiters, escape by....

    For instance:
    /opt/vertica/packages/kafka/bin/vkconfig load-spec --conf kafka.props --create --load-spec spec_1 --filters "FILTER KafkaInsertDelimiters(delimiter=E'\n') DELIMITER ',' " --load-method DIRECT --parser DELIMITED

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file

Can't find what you're looking for? Search the Vertica Documentation, Knowledge Base, or Blog for more information.