The Vertica Forum recently got a makeover! Let us know what you think by filling out this short, anonymous survey.
Please take this survey to help us learn more about how you use third party tools. Your input is greatly appreciated!

Unable to automate copy(load) data incrementally to vertica from Apache Kafka stream

I have setup a vertica(8.1) and apache kafka( 2.12-2.5.0) integration. I am able to receive data from kafka manually using the copy command and change the start_offset every time a new data arrives in kafka cluster. I want to automate it using vkconfig utility where i have setup cluster, source, target, load_spec, microbatch.
When i launch this microbatch, first run is ok till end of kafka log and then next run it always start from 0 offset, that means from begin of data log that exist in kafka which has already been loaded.
I would like to checkpoint the last loaded offset, so that every time it start from that checkpoint incremently instead of full load.

I used this command for microbatch :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --add-source lsbstatus_job --add-source-cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --username xxxx --password xxxx
This above commands works well.
When i try to set start_offset = -3, it gives following error :
/opt/vertica/packages/kafka/bin/vkconfig microbatch --update --microbatch stca_kmb_lsbstatus --target-schema pauser --target-table TAL_LSB_STATUS --source lsbstatus_job --cluster stca_kcluster --config-schema stca_kschema --load-spec stca_kloadspec --offset -3 --username xxxx --password xxxx
Exception in thread "main" Invalid Offset number detected:-3. Offset out of range. Offset should be positive integer or -2 which indicate head of the stream.

Please help



  • SruthiASruthiA Vertica Employee Employee

    Why are you passing start_offset as -3? Any specific reason? offset should be a positive number of -2.

  • It is mentioned in the vertica documentation of 9.2 version at following location "Special Starting Offset Values" at this hyperlink : with Apache Kafka|vkconfig Script Options|_____7#Special

    Actually if i specify -2, it always start to read the kafka log from begining which is not what i need. I want that vkconfig should read the kafka incrementally.

  • SruthiASruthiA Vertica Employee Employee

    Do you use consumer groups in kafka then? Can you share me the output of below query

    SELECT * FROM stream_config.stream_microbatch_history order by batch_start desc limit 15;

  • Since my kafka cluster is a producer that is connected to my stream log and vertica is a consumer. The consumer group has to be setup at vertica end and there is no option to set consumer group except the group-id in the load_spec. That is not working.
    There is nothing in the stream_microbatch_history table. Here is output of all my stream tables. When i run the launch, it loads perfectly for the first time but next time again it starts from begin rather than incremental
    /opt/vertica/packages/kafka/bin/vkconfig launch --config-schema stca_kschema --microbatch stca_kmb_lsbstatus --perform-load --cluster stca_kcluster --source lsbstatus_job --username xxxx --password xxxx
    Starting load
    COPY "pauser"."tal_lsb_status" SOURCE KafkaSource(stream='lsbstatus_job|0|-2', brokers='xxxxxxxxxx:9092', message_max_bytes=1048576, stop_on_eof=true) PARSER KafkaJSONParser(flatten_maps=FALSE) DIRECT STREAM NAME 'vkstream_one_time_load_' NO COMMIT

    padb=> select * from stca_kschema.stream_clusters;
    id | cluster | hosts
    2000001 | stca_kcluster | xxxxxxxxxxxxxxx:9092
    (1 row)

    padb=> select * from stca_kschema.stream_load_specs;;
    id | load_spec | filters | parser | parser_parameters | load_method | message_max_bytes | uds_kv_parameters
    500001 | stca_kloadspec | | KafkaJSONParser | flatten_maps=FALSE | DIRECT | 1048576 |
    (1 row)

    padb=> select * from stca_kschema.stream_microbatches;
    id | microbatch | target | load_spec | target_columns | rejection_schema | rejection_table | enabled
    500001 | stca_kmb_lsbstatus | 500001 | 500001 | | | | t
    (1 row)

    padb=> select * from stca_kschema.stream_microbatch_history;
    source_name | source_cluster | source_partition | start_offset | end_offset | end_reason | end_reason_message | partition_bytes | partition_messages | microbatch_id | microbatch | target_schema | target_table | timeslice | batch_start | batch_end | last_batch_duration | consecutive_error_count | transaction_id | frame_start | frame_end
    (0 rows)

    padb=> select * from stca_kschema.stream_microbatch_source_map;
    microbatch | source
    500001 | 500001
    (1 row)

    padb=> select * from stca_kschema.stream_scheduler;
    version | frame_duration | resource_pool | config_refresh | new_source_policy | eof_timeout_ms | pushback_policy | pushback_max_count | auto_sync
    v8.1.1 | 00:00:10 | kafka_default_pool | 00:05 | FAIR | 1000 | LINEAR | 5 | t
    (1 row)

    padb=> select * from stca_kschema.stream_sources;
    id | source | cluster | partitions | enabled
    500001 | lsbstatus_job | 2000001 | 1 | t
    (1 row)

    padb=> select * from stca_kschema.stream_targets;
    id | target_schema | target_table
    500001 | pauser | tal_lsb_status
    (1 row)


  • SruthiASruthiA Vertica Employee Employee

    it looks like this issue requires webEx. please open a support case and share logs for review.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file