How to set KafkaSource end condition 'Load data until no new data arrives within a timeout period'?
joergschaber
Vertica Customer ✭
Hi,
I want to manually COPY data into vertica usina a KafkaSource. In the documentation it says I can define the end condition such that I can 'Load data until no new data arrives within a timeout period'.
How do I specidfiy this and how do I set this timeout period?
A related question: When I COPY data into vertica with a KafkaSource using a defined duration of time. Can I somewhere retrieve the Kafka message offset, where this COPY statement stopped and start exactly there in the nex round?
Best,
Jörg
Tagged:
0
Answers
you can query stream_microbatch_history table to see which are the offsets loaded recently for every partition if you use scheduler.
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/KafkaIntegrationGuide/KafkaTables/stream_microbatch_history.htm
if you run it manually, once the copy completes, you can run the below query to know the offsets it loaded
select KafkaOffsets() over();
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/KafkaIntegrationGuide/KafkaFunctions/KafkaOffsets.htm
You can use the duration parameter for KafkaSource to determine how long you specify max ingestion time see https://www.vertica.com/docs/12.0.x/HTML/Content/Authoring/KafkaIntegrationGuide/KafkaFunctions/KafkaSource.htm?#4
As SruthiA mentioned, the KakfOffsets() will give you last offsets consumed by last COPY KafkaSource command.
Should have made two questions.....
ad 1: Thanks for the hint with KafkaOffsets(). How do I set permision to use KafkaOffsets?
ad 2: Setting the max ingestion time is not 'Load data until no new data arrives within a timeout period', but rather 'Copy as much data as possible for a set duration of time'
see https://www.vertica.com/docs/11.1.x/HTML/Content/Authoring/KafkaIntegrationGuide/UsingCOPYwithKafka.htm?tocpath=Apache Kafka Integration|Consuming Data From Kafka|_____1#6
dbadmin user should be able to run kafkaoffsets().. is it failing for you?
for 2. I think setting eof-timeout-ms may help
https://www.vertica.com/docs/11.1.x/HTML/Content/Authoring/KafkaIntegrationGuide/UtilityOptions/SchedulerUtilityOptions.htm?tocpath=Apache Kafka Integration|vkconfig Script Options|_____2