We're Moving!

The Vertica Forum is moving to a new OpenText Analytics Database (Vertica) Community.

Join us there to post discussion topics, learn about

product releases, share tips, access the blog, and much more.

Create My New Community Account Now


Unable to automate copy(load) data incrementally to vertica from Apache Kafka stream — Vertica Forum

Unable to automate copy(load) data incrementally to vertica from Apache Kafka stream

I have setup a vertica(8.1) and apache kafka( 2.12-2.5.0) integration. I am able to receive data from kafka manually using the copy command and change the start_offset every time a new data arrives in kafka cluster. I want to automate it using vkconfig utility where i have setup cluster, source, target, load_spec, microbatch.
When i launch this microbatch, first run is ok till end of kafka log and then next run it always start from 0 offset, that means from begin of data log that exist in kafka which has already been loaded.
I would like to checkpoint the last loaded offset, so that every time it start from that checkpoint incremently instead of full load.

I used this command for microbatch :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --add-source lsbstatus_job --add-source-cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --username xxxx --password xxxx
This above commands works well.
When i try to set start_offset = -3, it gives following error :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --update --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --source lsbstatus_job --cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --offset -3 --username xxxx --password xxxx
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: Invalid Offset number detected:-3. Offset out of range. Offset should be positive integer or -2 which indicate head of the stream.
at com.vertica.solutions.kafka.model.StreamMicrobatch.validateConfiguration(StreamMicrobatch.java:336)
at com.vertica.solutions.kafka.model.StreamMicrobatch.setFromMapAndValidate(StreamMicrobatch.java:253)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:65)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.MicrobatchCLI.main(MicrobatchCLI.java:55)

Please help

Tagged:

Answers

  • SruthiASruthiA Administrator

    Why are you passing start_offset as -3? Any specific reason? offset should be a positive number of -2.

  • It is mentioned in the vertica documentation of 9.2 version at following location "Special Starting Offset Values" at this hyperlink : https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/KafkaIntegrationGuide/UtilityOptions/MicroBatchUtilityOptions.htm?tocpath=Integrating with Apache Kafka|vkconfig Script Options|_____7#Special

    Actually if i specify -2, it always start to read the kafka log from begining which is not what i need. I want that vkconfig should read the kafka incrementally.

  • SruthiASruthiA Administrator

    Do you use consumer groups in kafka then? Can you share me the output of below query

    SELECT * FROM stream_config.stream_microbatch_history order by batch_start desc limit 15;

  • Since my kafka cluster is a producer that is connected to my stream log and vertica is a consumer. The consumer group has to be setup at vertica end and there is no option to set consumer group except the group-id in the load_spec. That is not working.
    There is nothing in the stream_microbatch_history table. Here is output of all my stream tables. When i run the launch, it loads perfectly for the first time but next time again it starts from begin rather than incremental
    /opt/vertica/packages/kafka/bin/vkconfig launch --config-schema stca_kschema --microbatch stca_kmb_lsbstatus --perform-load --cluster stca_kcluster --source lsbstatus_job --username xxxx --password xxxx
    Starting load
    COPY "pauser"."tal_lsb_status" SOURCE KafkaSource(stream='lsbstatus_job|0|-2', brokers='xxxxxxxxxx:9092', message_max_bytes=1048576, stop_on_eof=true) PARSER KafkaJSONParser(flatten_maps=FALSE) DIRECT STREAM NAME 'vkstream_one_time_load_' NO COMMIT

    OUTPUT OF STREAM TABLES :
    padb=> select * from stca_kschema.stream_clusters;
    id | cluster | hosts
    ---------+---------------+--------------------------
    2000001 | stca_kcluster | xxxxxxxxxxxxxxx:9092
    (1 row)

    padb=> select * from stca_kschema.stream_load_specs;;
    id | load_spec | filters | parser | parser_parameters | load_method | message_max_bytes | uds_kv_parameters
    --------+----------------+---------+-----------------+--------------------+-------------+-------------------+-------------------
    500001 | stca_kloadspec | | KafkaJSONParser | flatten_maps=FALSE | DIRECT | 1048576 |
    (1 row)

    padb=> select * from stca_kschema.stream_microbatches;
    id | microbatch | target | load_spec | target_columns | rejection_schema | rejection_table | enabled
    --------+--------------------+--------+-----------+----------------+------------------+-----------------+---------
    500001 | stca_kmb_lsbstatus | 500001 | 500001 | | | | t
    (1 row)

    padb=> select * from stca_kschema.stream_microbatch_history;
    source_name | source_cluster | source_partition | start_offset | end_offset | end_reason | end_reason_message | partition_bytes | partition_messages | microbatch_id | microbatch | target_schema | target_table | timeslice | batch_start | batch_end | last_batch_duration | consecutive_error_count | transaction_id | frame_start | frame_end
    -------------+----------------+------------------+--------------+------------+------------+--------------------+-----------------+--------------------+---------------+------------+---------------+--------------+-----------+-------------+-----------+---------------------+-------------------------+----------------+-------------+-----------
    (0 rows)

    padb=> select * from stca_kschema.stream_microbatch_source_map;
    microbatch | source
    ------------+--------
    500001 | 500001
    (1 row)

    padb=> select * from stca_kschema.stream_scheduler;
    version | frame_duration | resource_pool | config_refresh | new_source_policy | eof_timeout_ms | pushback_policy | pushback_max_count | auto_sync
    ---------+----------------+--------------------+----------------+-------------------+----------------+-----------------+--------------------+-----------
    v8.1.1 | 00:00:10 | kafka_default_pool | 00:05 | FAIR | 1000 | LINEAR | 5 | t
    (1 row)

    padb=> select * from stca_kschema.stream_sources;
    id | source | cluster | partitions | enabled
    --------+---------------+---------+------------+---------
    500001 | lsbstatus_job | 2000001 | 1 | t
    (1 row)

    padb=> select * from stca_kschema.stream_targets;
    id | target_schema | target_table
    --------+---------------+----------------
    500001 | pauser | tal_lsb_status
    (1 row)

    padb=>

  • SruthiASruthiA Administrator

    it looks like this issue requires webEx. please open a support case and share logs for review.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file