Unable to automate copy(load) data incrementally to vertica from Apache Kafka stream

I have setup a vertica(8.1) and apache kafka( 2.12-2.5.0) integration. I am able to receive data from kafka manually using the copy command and change the start_offset every time a new data arrives in kafka cluster. I want to automate it using vkconfig utility where i have setup cluster, source, target, load_spec, microbatch.
When i launch this microbatch, first run is ok till end of kafka log and then next run it always start from 0 offset, that means from begin of data log that exist in kafka which has already been loaded.
I would like to checkpoint the last loaded offset, so that every time it start from that checkpoint incremently instead of full load.

I used this command for microbatch :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --add-source lsbstatus_job --add-source-cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --username xxxx --password xxxx
This above commands works well.
When i try to set start_offset = -3, it gives following error :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --update --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --source lsbstatus_job --cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --offset -3 --username xxxx --password xxxx
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: Invalid Offset number detected:-3. Offset out of range. Offset should be positive integer or -2 which indicate head of the stream.
at com.vertica.solutions.kafka.model.StreamMicrobatch.validateConfiguration(StreamMicrobatch.java:336)
at com.vertica.solutions.kafka.model.StreamMicrobatch.setFromMapAndValidate(StreamMicrobatch.java:253)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:65)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.MicrobatchCLI.main(MicrobatchCLI.java:55)

Please help

Tagged:

Answers

  • SruthiASruthiA Administrator

    Why are you passing start_offset as -3? Any specific reason? offset should be a positive number of -2.

  • It is mentioned in the vertica documentation of 9.2 version at following location "Special Starting Offset Values" at this hyperlink : https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/KafkaIntegrationGuide/UtilityOptions/MicroBatchUtilityOptions.htm?tocpath=Integrating with Apache Kafka|vkconfig Script Options|_____7#Special

    Actually if i specify -2, it always start to read the kafka log from begining which is not what i need. I want that vkconfig should read the kafka incrementally.

  • SruthiASruthiA Administrator

    Do you use consumer groups in kafka then? Can you share me the output of below query

    SELECT * FROM stream_config.stream_microbatch_history order by batch_start desc limit 15;

  • Since my kafka cluster is a producer that is connected to my stream log and vertica is a consumer. The consumer group has to be setup at vertica end and there is no option to set consumer group except the group-id in the load_spec. That is not working.
    There is nothing in the stream_microbatch_history table. Here is output of all my stream tables. When i run the launch, it loads perfectly for the first time but next time again it starts from begin rather than incremental
    /opt/vertica/packages/kafka/bin/vkconfig launch --config-schema stca_kschema --microbatch stca_kmb_lsbstatus --perform-load --cluster stca_kcluster --source lsbstatus_job --username xxxx --password xxxx
    Starting load
    COPY "pauser"."tal_lsb_status" SOURCE KafkaSource(stream='lsbstatus_job|0|-2', brokers='xxxxxxxxxx:9092', message_max_bytes=1048576, stop_on_eof=true) PARSER KafkaJSONParser(flatten_maps=FALSE) DIRECT STREAM NAME 'vkstream_one_time_load_' NO COMMIT

    OUTPUT OF STREAM TABLES :
    padb=> select * from stca_kschema.stream_clusters;
    id | cluster | hosts
    ---------+---------------+--------------------------
    2000001 | stca_kcluster | xxxxxxxxxxxxxxx:9092
    (1 row)

    padb=> select * from stca_kschema.stream_load_specs;;
    id | load_spec | filters | parser | parser_parameters | load_method | message_max_bytes | uds_kv_parameters
    --------+----------------+---------+-----------------+--------------------+-------------+-------------------+-------------------
    500001 | stca_kloadspec | | KafkaJSONParser | flatten_maps=FALSE | DIRECT | 1048576 |
    (1 row)

    padb=> select * from stca_kschema.stream_microbatches;
    id | microbatch | target | load_spec | target_columns | rejection_schema | rejection_table | enabled
    --------+--------------------+--------+-----------+----------------+------------------+-----------------+---------
    500001 | stca_kmb_lsbstatus | 500001 | 500001 | | | | t
    (1 row)

    padb=> select * from stca_kschema.stream_microbatch_history;
    source_name | source_cluster | source_partition | start_offset | end_offset | end_reason | end_reason_message | partition_bytes | partition_messages | microbatch_id | microbatch | target_schema | target_table | timeslice | batch_start | batch_end | last_batch_duration | consecutive_error_count | transaction_id | frame_start | frame_end
    -------------+----------------+------------------+--------------+------------+------------+--------------------+-----------------+--------------------+---------------+------------+---------------+--------------+-----------+-------------+-----------+---------------------+-------------------------+----------------+-------------+-----------
    (0 rows)

    padb=> select * from stca_kschema.stream_microbatch_source_map;
    microbatch | source
    ------------+--------
    500001 | 500001
    (1 row)

    padb=> select * from stca_kschema.stream_scheduler;
    version | frame_duration | resource_pool | config_refresh | new_source_policy | eof_timeout_ms | pushback_policy | pushback_max_count | auto_sync
    ---------+----------------+--------------------+----------------+-------------------+----------------+-----------------+--------------------+-----------
    v8.1.1 | 00:00:10 | kafka_default_pool | 00:05 | FAIR | 1000 | LINEAR | 5 | t
    (1 row)

    padb=> select * from stca_kschema.stream_sources;
    id | source | cluster | partitions | enabled
    --------+---------------+---------+------------+---------
    500001 | lsbstatus_job | 2000001 | 1 | t
    (1 row)

    padb=> select * from stca_kschema.stream_targets;
    id | target_schema | target_table
    --------+---------------+----------------
    500001 | pauser | tal_lsb_status
    (1 row)

    padb=>

  • SruthiASruthiA Administrator

    it looks like this issue requires webEx. please open a support case and share logs for review.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file