NOTIFIER and parameter message.max.bytes

Hello

I'm using kafka to replicate data collector tables. From time to time I'm getting an error:
Message cannot be sent. Size (1351305) exceeds the maximum limit(1000000)

I tried to set parameter 'message.max.bytes' for notifier but it did not help:

CREATE NOTIFIER dc_notify_vertica
    ACTION 'kafka://vertica-kafka-dc:9092'
    ENABLE
    MAXMEMORYSIZE '128M'
    PARAMETERS 'message.max.bytes=5000000'
;

Seems like it does not work. Either it is a wrong parameter or it does not work.
What other parameters I need to set or how can I fix this?

Test:

  • Python:
from confluent_kafka import Producer

def python():
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

    p = Producer({'bootstrap.servers': 'vertica-kafka-dc:9092', "message.max.bytes": "5000000"})
    p.produce('test_dc_max_message', "@ * 2048570, callback=delivery_report)

    p.flush()


if __name__ == __main__":
    python()

Message delivered to test_dc_max_message [2]

  • Vertica:
import vertica_python

def vertica():
    with vertica_python.connect(**conect) as con:
        with con.cursor() as cur:
            cur.execute("select count(*) from dc_notifier_errors where notifier_name = 'dc_notify_vertica';")
            print cur.fetchone()

            sql = """
            SELECT NOTIFY('{}', 'dc_notify_vertica', 'test_dc_max_message');
            """.format("@ * 2048570)
            cur.execute(sql)

            cur.execute(select count(*) from dc_notifier_errors where notifier_name = 'dc_notify_vertica';")
            print cur.fetchone()

if __name__ == "__main__":
    vertica()

[1]
[2]

The number of errors increased in dc_notifier_errors.

Comments

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file