Create a Kafka source for a Vertica v8.1 scheduler
Hello,
I'm on Linux system and I'm using a Vertica Analytic Database v8.1.0-2. I successfully completed the following steps:
1- Creating a scheduler by launching the following command: /opt/vertica/packages/kafka/bin/vkconfig scheduler --create --config-schema kafkaScheduler --operator dbadmin --frame-duration '00:00:30'
I checked on Vertica and kafkaScheduler schema has well been created with a set of tables.
2- Creating a cluster by launching the following command: /opt/vertica/packages/kafka/bin/vkconfig cluster --create --config-schema kafkaScheduler --cluster kafka --hosts xxx:9092,xxx:9092,xxx:9092
Now, I’m trying to create a source with the following command:
/opt/vertica/packages/kafka/bin/vkconfig source --create --config-schema CloudKafkaScheduler --cluster kafka --source myKafkaTopicName --partitions 10
But I’m getting the following Vertica error:
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: ERROR: [[Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:171)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:127)
at com.vertica.solutions.kafka.model.StreamModel.(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.(StreamSource.java:36)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:48)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:14)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:20)
Caused by: java.sql.SQLNonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeQuery(Unknown Source)
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:150)
... 8 more
Caused by: com.vertica.support.exceptions.NonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
... 17 more
I checked the topic name specified for the source property and the topic number of partitions and it’s exactly what I have in Kafka.
Any help would be appreciated.
Comments
Hi Green89,
Could you please check if you Zookeeper and Kafka server are up and running. I could see the same error if the Kafka server is not running.
Thanks,
Ravi
Hi Ravi,
Kafka and zookeeper clusters are well running.
Thanks,
From Tom:
The scheduler is doing a check to see that vertica can connect to the brokers you’ve provided and that is failing. If you got your broker string wrong or there is a firewall between vertica and kafka, then this behavior is to be expected.
So I would first double check your broker string. Try doing a test consume with the kafka console consumer using the same broker string to verify the connectivity.
You can bypass this check by adding “--validation-type SKIP” to your CLI command, but we do the check for a reason. If you don’t have connectivity then things will just fail further along in the process.
Hello,
There is actually no firewall between kafka and vertica clusters, and the broker string has been checked also.
I did a produce/consume test in the kafka cluster on my topic, and it works good.
By bypassing the connectivity check the source has been successfully created.
Thanks,
Hello,
I pursued next steps, so after creating:
A flex table kafka_events by executing the following query: CREATE FLEX TABLE CloudKafkaScheduler.kafka_events();
A target:
/opt/vertica/packages/kafka/bin/vkconfig target --create --config-schema CloudKafkaScheduler --target-schema CloudKafkaScheduler --target-table kafka_events
A load-spec:
/opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec loadSpecKafka --config-schema CloudKafkaScheduler --load-method direct
A microbatch:
/opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch mbatchkafkaload --target-schema CloudKafkaScheduler --target-table kafka_events --load-spec loadspeckafka --add-source atlas_prod_vertica_global_fact_stream_v2 --add-source-cluster cloudKafka
And launched a scheduler instance:
/opt/vertica/packages/kafka/bin/vkconfig launch --instance-name atlasVerticaStaging --config-schema CloudKafkaScheduler
I tested the if the loading process is correctly working by producing new messages in my kafka topic, however I couldn't see any new lines added into the target flex table in Vertica.
I checked log files in /opt/vertica/log/ and I didn't found a useless error.
Any help to understand the issue would be appreciated.
Thanks,
A typo: I checked log files in /opt/vertica/log/ and I didn't found errors.
By adding the kafka hosts in the /etc/hosts file of vertica cluster, the connectivity issue has been resolved.