Vertica stuck importing messages from Kafka

I'm trying to deal Kafka in Vertica v7.2.0-0 new feature. I have running Kafka broker, topic 'events' with json data. I configured Kafka scheduler in Vertica, set up broker, target table, Kafka JSON parser, etc. All is ok except when Vertica is recieving broken messages she rollbacking COPY transaction and do not move offset instead of getting new messages and just skip errors or write em to rejected table. Lets say I have NOT NULL column in table and when Vertica gets json without needed column next error was continuously occured (from vertica.log):

 

 

COPY "public"."events" SOURCE KafkaSource(stream='events|0|410015', brokers='1.2.3.4:9092', duration=interval '9884 milliseconds', stop_on_eof=true, executionparallelism=1 ) PARSER KafkaJSONParser( ) REJECTED DATA AS TABLE public.kafka_rej2 TRICKLE NO COMMIT
Cannot set a NOT NULL column (device_type) to a NULL value in COPY statement

And how it looks in kafka_config.kafka_events:

 

 

Error occurred, rolling back changes from Micro
batch. | java.sql.SQLDataException: [Vertica][VJDBC](2501) ERROR:
Cannot set a NOT NULL column (device_type) to a NULL value in COPY statement
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
at com.vertica.jdbc.common.SStatement.executeNoParams(Unknown Source)
at com.vertica.jdbc.common.SStatement.executeUpdate(Unknown Source)
at com.vertica.solutions.kafka.scheduler.MicroBatch.execute(MicroBatch.java:158)
at com.vertica.solutions.kafka.scheduler.LaneWorker.run(LaneWorker.java:67)
at java.lang.Thread.run(Unknown Source)
Caused by: com.vertica.support.exceptions.DataException: [Vertica][VJDBC](2501) ERROR: Cannot set a NOT NULL column (device_type) to a NULL value in COPY statement
... 11 more

 

 

Oblivious thet I have en error, but how to tell Vertica to skip copy errors and go to next offset in Kafka topic? I have tried all KafkaJSONParser options but without and effect.

 

Comments

  • Hi,

     

    Sounds like you hit an issue loading NULL values. We will look into adding a knob for the parser in the future but for now here's the 3 work arounds that should get you moving again:

     

    1. Use a COPY Expression to transform the NULL data so it isn't blocked by the NOT NULL constraint

     

    link to copy expression docs

     

    2. Remove the NOT NULL constraint from the table you are trying to load if it isn't required for your use case.

    3. Remove NULL values from the data prior to loading.

     

    Thanks,

    Mark

     

    Product team HPE Vertica

  • Hi Mark,

    +1 to extending parser functionality for "null" values.

    The COPY behavior for semi-structured data imports becomes more of an issue when you consider use cases such as the Kafka Job Scheduler / PARSER KafkaJSONParser.

    We shouldn't expect an ETL (i.e., pre-processing data prior to Kafka writting) to insert missing values such as a pipe delimiter COPY where you apriori provide missing format values (e.,g. col1|||\n).

    A table column default should be sufficient when using a non-column list COPY command.

     

    That is, the user expects to be able to have missing keys and provide default behavior for target table columns.

    if you mandated explicit column lists in a COPY, tThe copy "expression extensions" do not work with `COALESCE`. At least from my testing.

    And a default column value on a table does not work for COPY commands as it does for `INSERT` commands.

    When you import semi-structured data missing keys Or the desire to add default column value on import for columns such as an "inserted_at" column need support.

    Here is a canonical example for your tests.

    CREATE TABLE public.kafkaJSONUser ("user" varchar(30) NOT NULL DEFAULT 'noName', "events" Int NOT NULL DEFAULT 0
    , "month" INT null, "year" INT null, created_at timestamp NULL DEFAULT now());

     

    I expect to have the COPY command to insert a defult value for a missing input value/key for `created_at`

    COPY public.kafkaJSONUser ("user", "events", "month", "year", "created_at") FROM STDIN parser fjsonparser() DIRECT ABORT ON ERROR;
    >> {"user": "B", "events": 100, "month": 1, "year": 2015, "created_at": "2001-01-01"}
    >> {"user": "B", "events": 100, "month": 1, "year": 2015} -- no default value inserted

     

    Mahalo,

    - Jack Gudenkauf

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file