Vertica kafka COPY
I'm trying to load messages from kafka to vertica using COPY command from a kafka cluster with SSL/TLS setup there:
COPY TEST_TABLE SOURCE KafkaSource( stream='test-topic|0|0', brokers='kafka-lab.com:32090', stop_on_eof=true, group_id='test-topic', kafka_conf='sasl.username=test_user;sasl.password=test_password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL' ) PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True) ABORT ON ERROR REJECTMAX 1;
following Vertica docs:
However no messages are being loaded, I'm trying to follow the official docs:
You can stream data from Kafka into Vertica two ways: manually using a COPY statement and the KafkaSource UD source function, or automatically using the scheduler.
To directly copy data from Kafka via an SSL connection, you set session variables containing an SSL key and certificate. When KafkaSource finds that you have set these variables, it uses the key and certificate to create a secure connection to Kafka
However it's not clear to me, can I pass these variables (CA , Certificate SSL) using "kafka_conf" param in KafkaSource method or should I set them via vsql only
Answers
Anytime you require SSL/TLS with Kafka, you need to pass CA. Cert...... via UD Session Parameters.
see: https://www.vertica.com/docs/9.3.x/HTML/Content/Authoring/AdministratorsGuide/ConfiguringTheDB/UDSessionParameters.htm for list of these parameters
and see: https://www.vertica.com/docs/9.3.x/HTML/Content/Authoring/KafkaIntegrationGuide/TLS-SSL/KafkaTLS-SSLExamplePart4DirectlyLoadFromKafka.htm for an example of how to set it.
Thank you, but in order to do the above I must first apply the steps 1 and 2 below:
Correct?
Also in the "Part 4" (the link you shared), I see the COPY is used without passing "kafka_conf" param into it, and that's my main question. is it either setting up the SSL variables following "part1, part2, part4" or passing the "kafka_conf" param?
Additionally, I want to execute this COPY command periodically using a cronjob, is it doable? or should I use the scheduler in this case?
Thank you,
correct certificates are already set on Kafka Cluster. So the way to go for me is using SASL_SSL and as shown in the COPY command below:
but if I get you well, I need to set the UD session parameters every time I need to run the above COPY?
meaning I should get the CA details from the kafka cluster and the below should be set:
but is it required everytime ? or is there a way to set it once?
also what's confusing is that I can access the above kafka cluster from my local machine using
where client-ni.config contains
so why is it possible from console but not from vertica
Yes, you will need to set the session parameters with every session where you will call one of the Kafka UDX (KafkaSource...).
kafka-console-consumer.sh is a java based Kafka client. Vertica is using a C++ library librdkafka and ships the /opt/vertica/packages/kafka/bin/kafkacat CLI. It's generally better to use that client to test connectivity, settings....
Note that SSL configuration in the Vertica Kafka Integration has been simplified in 11.0
https://www.vertica.com/docs/11.0.x/HTML/Content/Authoring/NewFeatures/11.0/11.0.0/IntegrationKafka.htm?
Thank you for your support,
but one thing is still not clear for me, I had the impression that SASL is an abstraction layer where I won't have to do the manual work. so why are we passing the below kafka_conf:
if we also have to set the UD Session Parameters every time we execute this COPY from kafka?
what am I missing? thanks,
It's a bit confusing but let me try to explain it .
If you set only the UD Session parameters , Vertica will essentially set the kafka security.protocol to SSL and pass CA, cert.... as required for that protocol.
If you want to use SASL_SSL instead and with the same CA, cert..., , you need to overwrite that via setting the property in kafka_conf.
Alright, now it's clear thank you very much. So after we set the kafka_Enable_ssl to 1, I need to override it in kafka_conf to use SASL_SSL. and in case I'm not using vertica 11 then I also need to set the additional params, otherwise, only
kafka_Enable_SSL
kafka_SSL_CA
should be enough, right?
I will try the above and let you know thanks.
I am also facing the same issue now...I have chain certificate, in kafka_SSL_CA paramter what to be set? intermediate ca or ca?
Re: question about chain certificate, in recent versions of Vertica, it's possible to pass a bundle in the kafka_SSL_CA UDX session parameter (in your case intermediate ca + ca). If you open a support ticket, we can look more specifically at your particular setup.
we are using 9.3.1, how the CA certs to be passed? Both CA and intermediate CA in kafka_SSL_CA parameter? As here the authentication mechanism is SASL_SSL should I set Enable_SSL = 1? what are all the SSL parameter to be configured?
9.3.1-12 is the hotfix where we enabled CA bundles in the kafka_SSL_CA parameter (see release notes). If you want to enable SASL_SSL you will also need to set the UDX Session Paramter Enable_SSL to 1. As mentioned, if you open a ticket, we can look at your particular setup.