integration with apache kafka error
Hi,
Could you please help me with the following issue?
I have an installed vertica 9.3 on one host and installed apache kafka on second host. I want integration vertica with apache kafka (2.4.0). I configure by official vertica's guide, but, when I try to make source: vkconfig source --create --cluster kafka_weblog --source test --partitions 1 --conf /opt/vertica/packages/kafka/config/my.conf
I get error:
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException: ERROR: [[Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_3_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:248)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:194)
at com.vertica.solutions.kafka.model.StreamModel.(StreamModel.java:93)
at com.vertica.solutions.kafka.model.StreamSource.(StreamSource.java:44)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:62)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:13)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:59)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:141)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:29)
Caused by: java.sql.SQLNonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_3_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeWithParams(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.executeQuery(Unknown Source)
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:227)
... 8 more
Caused by: com.vertica.support.exceptions.NonTransientException: [Vertica]VJDBC ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/qb_workspaces/jenkins2/ReleaseBuilds/Grader/REL-9_3_1-x_grader/build/udx/supported/kafka/KafkaUtil.cpp:163], error code: 0, message: Error getting metadata: [Local: Broker transport failure]
If I show table weblog_sched.stream_clusters, then there is localhost:9092 in the column hosts, but not my ip-address of my kafka server (192.168.0.8), although when I created the cluster I specified the address of the kafka server: vkconfig cluster --create --cluster kafka_weblog --hosts 192.168.0.8:9092 --conf /opt/vertica/packages/kafka/config/my.conf
Why is that? (I think, that this error is associated with an incorrect entry in weblog_sched.stream_clusters)
Answers
This is most likely because of the way you set up the advertised listener on the Kafka side. Can you run the following command:
select kafkacheckbrokers(using parameters brokers='192.168.0.8:9092) over();
This should return the advertised listener(s) -in your case, localhost:9092 -. Consuming messages can only be done on the advertised listener so the solution will be to set your advertised listener to hostname/ip that can be resolved by the client (192.168.0.8:9092 for instance).
For more details on Kafka listeners, check: https://www.confluent.io/blog/kafka-listeners-explained/