Vertica kafka COPY

I'm trying to load messages from kafka to vertica using COPY command from a kafka cluster with SSL/TLS setup there:

 COPY TEST_TABLE
      SOURCE KafkaSource(
        stream='test-topic|0|0',
        brokers='kafka-lab.com:32090',
        stop_on_eof=true,
        group_id='test-topic',
        kafka_conf='sasl.username=test_user;sasl.password=test_password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL'
      )
PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True) ABORT ON ERROR REJECTMAX 1;

following Vertica docs:

However no messages are being loaded, I'm trying to follow the official docs:

You can stream data from Kafka into Vertica two ways: manually using a COPY statement and the KafkaSource UD source function, or automatically using the scheduler.

To directly copy data from Kafka via an SSL connection, you set session variables containing an SSL key and certificate. When KafkaSource finds that you have set these variables, it uses the key and certificate to create a secure connection to Kafka

However it's not clear to me, can I pass these variables (CA , Certificate SSL) using "kafka_conf" param in KafkaSource method or should I set them via vsql only

Answers

  • SergeBSergeB - Select Field - Employee

    Anytime you require SSL/TLS with Kafka, you need to pass CA. Cert...... via UD Session Parameters.

    see: https://www.vertica.com/docs/9.3.x/HTML/Content/Authoring/AdministratorsGuide/ConfiguringTheDB/UDSessionParameters.htm for list of these parameters

    and see: https://www.vertica.com/docs/9.3.x/HTML/Content/Authoring/KafkaIntegrationGuide/TLS-SSL/KafkaTLS-SSLExamplePart4DirectlyLoadFromKafka.htm for an example of how to set it.

  • Thank you, but in order to do the above I must first apply the steps 1 and 2 below:

    Correct?

    Also in the "Part 4" (the link you shared), I see the COPY is used without passing "kafka_conf" param into it, and that's my main question. is it either setting up the SSL variables following "part1, part2, part4" or passing the "kafka_conf" param?

    Additionally, I want to execute this COPY command periodically using a cronjob, is it doable? or should I use the scheduler in this case?

  • SergeBSergeB - Select Field - Employee
    1. Steps 1 /2 are just examples of creating your own certificates. Chances are those were already created in your Kafka Cluster and you can just use them?
    2. The example in part 4 is when using the SSL Kafka security.protocol (no need to add it to kafka_conf). If you want to use the SASL_SSL security.protocol you need to set the relevant UD Session Parameters AND add that security.protocol to kafka_conf.
    3. There is no problem running COPY via a cronjob. Usually, when customers choose that path they use -3 as starting offset (so that Kafka tracks last consumed offsets).
  • Thank you,
    correct certificates are already set on Kafka Cluster. So the way to go for me is using SASL_SSL and as shown in the COPY command below:

     COPY TEST_TABLE
          SOURCE KafkaSource(
            stream='test-topic|0|-3',
            brokers='kafka-lab.com:32090',
            stop_on_eof=true,
            group_id='test-topic',
            kafka_conf='sasl.username=test_user;sasl.password=test_password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL'
          )
    PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True) ABORT ON ERROR REJECTMAX 1;
    

    but if I get you well, I need to set the UD session parameters every time I need to run the above COPY?
    meaning I should get the CA details from the kafka cluster and the below should be set:

    => \set cert '\''cat /home/dbadmin/SSL/server.crt'\''
    => \set pkey '\''cat /home/dbadmin/SSL/server.key'\''
    => \set ca '\''cat /home/dbadmin/SSL/root.crt'\''
    => \set pass '\''echo $KVERTICA_PASS'\''
    => alter session set MaxSessionUDParameterSize=100000;
    => ALTER SESSION SET UDPARAMETER kafka_SSL_Certificate=:cert;
    => ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKey_secret=:pkey;
    => ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKeyPassword_secret=:pass;
    => ALTER SESSION SET UDPARAMETER kafka_SSL_CA=:ca;
    => ALTER SESSION SET UDPARAMETER kafka_Enable_SSL=1;

    but is it required everytime ? or is there a way to set it once?

    also what's confusing is that I can access the above kafka cluster from my local machine using

    kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-serverkafka-lab.com:32090 -- 
    consumer.config .\client-ni.config
    

    where client-ni.config contains

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='test_user' password='test_password';

    so why is it possible from console but not from vertica

  • SergeBSergeB - Select Field - Employee

    Yes, you will need to set the session parameters with every session where you will call one of the Kafka UDX (KafkaSource...).

    kafka-console-consumer.sh is a java based Kafka client. Vertica is using a C++ library librdkafka and ships the /opt/vertica/packages/kafka/bin/kafkacat CLI. It's generally better to use that client to test connectivity, settings....

    Note that SSL configuration in the Vertica Kafka Integration has been simplified in 11.0

    https://www.vertica.com/docs/11.0.x/HTML/Content/Authoring/NewFeatures/11.0/11.0.0/IntegrationKafka.htm?

  • Thank you for your support,
    but one thing is still not clear for me, I had the impression that SASL is an abstraction layer where I won't have to do the manual work. so why are we passing the below kafka_conf:

    kafka_conf='sasl.username=test_user;sasl.password=test_password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL'
    

    if we also have to set the UD Session Parameters every time we execute this COPY from kafka?
    what am I missing? thanks,

  • SergeBSergeB - Select Field - Employee
    edited October 2021

    It's a bit confusing but let me try to explain it .

    If you set only the UD Session parameters , Vertica will essentially set the kafka security.protocol to SSL and pass CA, cert.... as required for that protocol.

    If you want to use SASL_SSL instead and with the same CA, cert..., , you need to overwrite that via setting the property in kafka_conf.

  • Alright, now it's clear thank you very much. So after we set the kafka_Enable_ssl to 1, I need to override it in kafka_conf to use SASL_SSL. and in case I'm not using vertica 11 then I also need to set the additional params, otherwise, only
    kafka_Enable_SSL
    kafka_SSL_CA

    should be enough, right?
    I will try the above and let you know thanks.

  • I am also facing the same issue now...I have chain certificate, in kafka_SSL_CA paramter what to be set? intermediate ca or ca?

  • SergeBSergeB - Select Field - Employee

    Re: question about chain certificate, in recent versions of Vertica, it's possible to pass a bundle in the kafka_SSL_CA UDX session parameter (in your case intermediate ca + ca). If you open a support ticket, we can look more specifically at your particular setup.

  • we are using 9.3.1, how the CA certs to be passed? Both CA and intermediate CA in kafka_SSL_CA parameter? As here the authentication mechanism is SASL_SSL should I set Enable_SSL = 1? what are all the SSL parameter to be configured?

  • SergeBSergeB - Select Field - Employee

    9.3.1-12 is the hotfix where we enabled CA bundles in the kafka_SSL_CA parameter (see release notes). If you want to enable SASL_SSL you will also need to set the UDX Session Paramter Enable_SSL to 1. As mentioned, if you open a ticket, we can look at your particular setup.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file