Vertica 7.2 Kafka not fetching data from some partitions

for example, from log:

 

 

COPY "public"."events" SOURCE 
KafkaSource(stream='events|0|1590085,events|1|1586923,events|2|-2',
brokers='1.2.3.4:9092', duration=interval '9872 milliseconds',
stop_on_eof=true, executionparallelism=1 ) PARSER KafkaJSONParser( )
REJECTED DATA AS TABLE public.kafka_rej6 DIRECT NO COMMIT

But:

 

 

/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic events --time -1
events:0:1590085
events:1:1586923
events:2:1597176

As you can see, vertica do not consume partition #2 at all. I trying to set --start-offset without results. BTW many other strange things happened with kafka package, like suddenly node downs, but here I whant to know, why my partitions did not consuming.

 

 

Yes, kafka_rej6 rejection table is empty, kafka's server.log not say anyting as same as vertica's dbLog.

 

Now look:

 

COPY "public"."events" SOURCE KafkaSource(stream='events|2|-2', 
brokers='1.2.3.4:9092', duration=interval '9872 milliseconds',
stop_on_eof=true, executionparallelism=1 ) PARSER KafkaJSONParser( )
REJECTED DATA AS TABLE public.kafka_rej6 DIRECT NO COMMIT;

Rows Loaded
-------------
204505

 

If I keep only one partition in stream Vertica is succesfully consume messages. WHY?

 

I'm very disappointed with integration spended almost two weeks on it without any result. No any logs and silent running make it impossible to debug.

Comments

  • Also I have this errors sometimes:

     

    2015-11-19 12:39:07.605 Init Session:0x7fc5a4013590-a000000063c174 
    <LOG> @v_events_node0001: 00000/3316: Executing statement: 'INSERT
    INTO "kafka_config".kafka_events VALUES ('2015-11-19 12:39:06.486
    +0000','ERROR','com.vertica.solutions.kafka.scheduler.LaneWorker','2015-11-
    19 12:39:01.265 ','public','flex_events','Error occurred, rolling back changes
    from Microbatch.','java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
    ERROR: Error calling process() in User Function UDSource at
    [src/KafkaSource.cpp:147], error code: 0, message: User code caused Vertica
    to throw exception "wrong stats. current stats: ktopic events, kpartition 0,
    koffset 972866. new incoming message: koffset 972407"
    at com.vertica.util.ServerErrorData.buildException(Unknown Source)
    at com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
    at com.vertica.dataengine.VResultSet.initialize(Unknown Source)
    at com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown
    Source)
    at com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown
    Source)
    at com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
    at com.vertica.jdbc.common.SStatement.executeNoParams(Unknown Source)
    at com.vertica.jdbc.common.SStatement.executeUpdate(Unknown
    Source)
    at com.vertica.solutions.kafka.scheduler.MicroBatch.execute(MicroBatch.java:158
    )
    at com.vertica.solutions.kafka.scheduler.LaneWorker.run(LaneWorker.java:67)
    at java.lang.Thread.run(Unknown Source)
    Caused by: com.vertica.support.exceptions.NonTransientException: [Vertica]
    [VJDBC](5861) ERROR: Error calling process() in User Function UDSource at [src/KafkaSource.cpp:147], error code: 0, message: User code caused Vertica
    to throw exception "wrong stats. current stats: ktopic events, kpartition 0, koffset 972866. new incoming message: koffset 972407"
    ... 11 more
    ')'

    and this (in Kafka)

    [2015-11-19 12:39:01,604] ERROR Closing socket for /3.2.1.1 because of error (kafka.network.Processor)
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:417)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:482)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:597)
    at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
    at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
    at kafka.network.MultiSend.writeTo(Transmission.scala:101)
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
    at kafka.network.MultiSend.writeTo(Transmission.scala:101)
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
    at kafka.network.Processor.write(SocketServer.scala:472)
    at kafka.network.Processor.run(SocketServer.scala:342)
    at java.lang.Thread.run(Thread.java:745)

     

    Also this:

     

    2015-11-19 12:47:48.157 Init Session:0x7f8810013a70-a000000063cbb1 
    [UserMessage] <UDx> Unknown exception while emitting JSON row [128],
    skipping futher data for this row.
    2015-11-19 12:47:48.157 Init Session:0x7f8810013a70-a000000063cbb1
    [UserMessage] <UDx> Non-YAJL parse cancel by the JSON parser on record #
    [128] with status: [client canceled parse] and Bytes consumed: [419]: [parse
    error: client cancelled parse via callback return value
    attr2":1,"origin_host_id":555}
    (right here) ------^
    ]
    2015-11-19 12:47:48.157 Init Session:0x7f8810013a70-a000000063cbb1 [ResourceManager] <WARNING> file [] is not under management. Statement conflicted with a 'drop location'

    all this causing sudden crash one of vertica nodes.

  • As it turned out, it was my foult. One of my nodes simply could not resolve broker host. Rether I have hostnames and node tried to resolve the hostname without success. So I just add record with my broker host to /etc/hosts.

     

    Errors from previous my post I still can't explain.

  • Hi Ravlio!

     

    I see you have experience integrating kafka+vertica. 

    Did you have problems with KafkaLib?

    Can you help me with https://community.dev.hpe.com/t5/Vertica-Forum/Library-with-name-KafkaLib-does-not-exist/m-p/233738#U233738 ?

  • When waiting for an answer or where to go? My nodes keeps crashing for questionable reasons:

     

    Backtrace Generated by Error
    Signal: [0x000000000000000b] PID: [0x0000000000002e68] PC: [0x00007fd9480df969] FP: [0x00007fd84ee01850] SIGSEGV: SEGV_MAPERR SI_ADDR : [0x0000000000000000]
    /opt/vertica/bin/vertica[0x2fca8f7]
    /opt/vertica/bin/vertica[0x3017084]
    /opt/vertica/bin/vertica[0x30182e9]
    /lib64/libc.so.6(+0x326a0)[0x7fd97225e6a0]
    /home/dbadmin/events/v_events_node0003_catalog/Libraries/026497d68bc91ce792925b04e775630c00a0000064c12e76/KafkaLib_026497d68bc91ce792925b04e775630c00a0000064c12e76.so(_ZN11KafkaParser19processWithMetadataERN7Vertica15ServerInterfaceERNS0_10DataBufferERNS0_12LengthBufferENS0_10InputStateE+0x59)[0x7fd9480df969]
    /opt/vertica/bin/vertica[0x113b15f]
    /opt/vertica/bin/vertica[0x111de7a]
    /opt/vertica/bin/vertica[0xff048a]
    /opt/vertica/bin/vertica[0xff4680]
    /opt/vertica/bin/vertica[0xa935c7]
    /lib64/libc.so.6(+0x438f0)[0x7fd97226f8f0]
    END BACKTRACE
    THREAD CONTEXT
    Thread type: EE Internal Command Queue Thread
    Request: COPY "public"."flex_events" SOURCE KafkaSource(stream='events|0|102021420,events|1|106736571,events|2|102334640,events_out|3|106907446', brokers='host1:9092,host2:9092', duration=interval '822 milliseconds', stop_on_eof=true, executionparallelism=1 ) PARSER KafkaJSONParser( ) REJECTED DATA AS TABLE kafka_rej8 DIRECT NO COMMIT
    10: Send
    124: Router
    123: ValExpr
    2: Copy
    FAULT => 15: NewEENode
    16) ExprEval depth=(0) parent=0, peer#0; outTup nCol=82,nkey=0,inlSz=2699,fixSz=758
    17) LoadUnion (1) 16, #0; nInputs=24; out 81,0,2691,750
    18) Load (2) 17, #0; CRPstate=4; out 81,0,2691,750
    19) Load (2) 17, #1; CRPstate=4; out 81,0,2691,750
    20) Load (2) 17, #2; CRPstate=4; out 81,0,2691,750
    21) Load (2) 17, #3; CRPstate=4; out 81,0,2691,750
    22) Load (2) 17, #4; CRPstate=4; out 81,0,2691,750
    23) Load (2) 17, #5; CRPstate=4; out 81,0,2691,750
    24) Load (2) 17, #6; CRPstate=2; out 81,0,2691,750
    25) Load (2) 17, #7; CRPstate=2; out 81,0,2691,750
    26) Load (2) 17, #8; CRPstate=2; out 81,0,2691,750
    27) Load (2) 17, #9; CRPstate=2; out 81,0,2691,750
    28) Load (2) 17, #10; CRPstate=2; out 81,0,2691,750
    29) Load (2) 17, #11; CRPstate=2; out 81,0,2691,750
    30) Load (2) 17, #12; CRPstate=2; out 81,0,2691,750
    (PPFAULT) => Load (id=31) (2) 17, #13; CRPstate=2; out 81,0,2691,750
    32) Load (2) 17, #14; CRPstate=2; out 81,0,2691,750
    33) Load (2) 17, #15; CRPstate=2; out 81,0,2691,750
    34) Load (2) 17, #16; CRPstate=2; out 81,0,2691,750
    35) Load (2) 17, #17; CRPstate=2; out 81,0,2691,750
    36) Load (2) 17, #18; CRPstate=2; out 81,0,2691,750
    37) Load (2) 17, #19; CRPstate=2; out 81,0,2691,750
    38) Load (2) 17, #20; CRPstate=2; out 81,0,2691,750
    39) Load (2) 17, #21; CRPstate=5; out 81,0,2691,750
    40) Load (2) 17, #22; CRPstate=4; out 81,0,2691,750
    41) Load (2) 17, #23; CRPstate=2; out 81,0,2691,750
    Transaction: [0x00b00000004e288d]
    END THREAD CONTEXT

     

    The flow is about 100k messages/sec. Not so much. I tried everything. Tried change duration frame, tried send synthetic messages like {"id":1} Tried direct, trickle load. I tried resource pool tuning. I have 3 nodes Community Edition, free of additional load for time while testing.

     

    Feeling that Vertica Kafka falling with any slightest problem.

  • Hi!

    Do you have plan B? Do you plan to write your own kafka producer?

  • I gave up and return to classic 'COPY FROM LOCAL DIRECT' form with json parser. Not bad, but not as seamless as I wanted.

  • I'm sorry to hear about this and not respond in a timely fashion: these issues that you ran into are known and currently our top priority fixes. We are hoping to resolve them as soon as we can.

  • There is a work-around in the meantime for the node failures. You will want to run this command on your cluster:

     

    alter database <dbname> set EnableCooperativeParse=0;

  • Thank you very much. Disabling EnableCooperativeParse helped. Fourh hour without node falling.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file