Create a Kafka source for a Vertica v8.1 scheduler

Create a Kafka source for a Vertica v8.1 scheduler


I'm on Linux system and I'm using a Vertica Analytic Database v8.1.0-2. I successfully completed the following steps:

1- Creating a scheduler by launching the following command: /opt/vertica/packages/kafka/bin/vkconfig scheduler --create --config-schema kafkaScheduler --operator dbadmin --frame-duration '00:00:30'
I checked on Vertica and kafkaScheduler schema has well been created with a set of tables.
2- Creating a cluster by launching the following command: /opt/vertica/packages/kafka/bin/vkconfig cluster --create --config-schema kafkaScheduler --cluster kafka --hosts xxx:9092,xxx:9092,xxx:9092

Now, I’m trying to create a source with the following command:
/opt/vertica/packages/kafka/bin/vkconfig source --create --config-schema CloudKafkaScheduler --cluster kafka --source myKafkaTopicName --partitions 10

But I’m getting the following Vertica error:

Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: ERROR: [[Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:171)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:127)
at com.vertica.solutions.kafka.model.StreamModel.(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.(StreamSource.java:36)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:48)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:14)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:20)
Caused by: java.sql.SQLNonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeQuery(Unknown Source)
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:150)
... 8 more
Caused by: com.vertica.support.exceptions.NonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [src/KafkaUtil.cpp:178], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
... 17 more

I checked the topic name specified for the source property and the topic number of partitions and it’s exactly what I have in Kafka.

Any help would be appreciated.


  • Hi Green89,

    Could you please check if you Zookeeper and Kafka server are up and running. I could see the same error if the Kafka server is not running.


  • Hi Ravi,

    Kafka and zookeeper clusters are well running.


  • YangXuYangXu Employee

    From Tom:

    The scheduler is doing a check to see that vertica can connect to the brokers you’ve provided and that is failing. If you got your broker string wrong or there is a firewall between vertica and kafka, then this behavior is to be expected.

    So I would first double check your broker string. Try doing a test consume with the kafka console consumer using the same broker string to verify the connectivity.

    You can bypass this check by adding “--validation-type SKIP” to your CLI command, but we do the check for a reason. If you don’t have connectivity then things will just fail further along in the process.

  • edited May 2017


    There is actually no firewall between kafka and vertica clusters, and the broker string has been checked also.

    I did a produce/consume test in the kafka cluster on my topic, and it works good.

    By bypassing the connectivity check the source has been successfully created.


  • edited May 2017


    I pursued next steps, so after creating:

    • A flex table kafka_events by executing the following query: CREATE FLEX TABLE CloudKafkaScheduler.kafka_events();

    • A target:
      /opt/vertica/packages/kafka/bin/vkconfig target --create --config-schema CloudKafkaScheduler --target-schema CloudKafkaScheduler --target-table kafka_events

    • A load-spec:
      /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec loadSpecKafka --config-schema CloudKafkaScheduler --load-method direct

    • A microbatch:
      /opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch mbatchkafkaload --target-schema CloudKafkaScheduler --target-table kafka_events --load-spec loadspeckafka --add-source atlas_prod_vertica_global_fact_stream_v2 --add-source-cluster cloudKafka

    • And launched a scheduler instance:
      /opt/vertica/packages/kafka/bin/vkconfig launch --instance-name atlasVerticaStaging --config-schema CloudKafkaScheduler

    I tested the if the loading process is correctly working by producing new messages in my kafka topic, however I couldn't see any new lines added into the target flex table in Vertica.

    I checked log files in /opt/vertica/log/ and I didn't found a useless error.

    Any help to understand the issue would be appreciated.


  • A typo: I checked log files in /opt/vertica/log/ and I didn't found errors.

  • By adding the kafka hosts in the /etc/hosts file of vertica cluster, the connectivity issue has been resolved.

