java multithread copy stream

Hello,

I'm migrating data from a legacy time series database into a Vertica database using copy stream via the Java API.

At the moment, everything is done sequentially:
1. Generate a copy statement for the target Vertica table
2. Create a child thread:
    a) Create a PipedInputStream/PipedOutputStream pair
    b) Obtain a connection from a Vertica node (data source) -- autocommit off
    c) Start a Vertica copy stream using the copy statement, listening to the PipedInputStream
    d) Child thread will commit the copy statement and close connection once PipedInputStream is closed by Parent (once Parent has finished streaming)
3. Parent queries and streams time series one by one, sequentially, to the child thread using the PipedOutputStream

There is only one Vertica target table.

Can I parallelize these steps to a single Vertica node (data source)?

If our Vertica setup consists of three nodes (data sources), does that mean I can only have up to three parallel threads doing steps 1-3, each one pinned to a different Vertica node?


Thank you!

Comments

  • >> Can I parallelize these steps to a single Vertica node (data source)?
    Yeap - open several input pipes and first row send to pipe 1, second row send to second pipe. Or you can do it in batches : {0-100} rows to PIPE1, {101-200} rows to PIPE2. While vertica needs to parse rows and to store them on HDD - very quickly load will be in parallel. Double buffer - is a best approach.

    As example take a look on StreamBase Vertica Adapter:
    Vertica Load Adapter Overview

    The Vertica load adapter provides the fastest way to load data into Vertica. There are two different basic load types, streaming load and simple buffered load.

    Each bulk load to Vertica creates what is called a miniROS. These miniROSs are merged into larger and larger ROSs as defined by Vertica configuration parameters. Creating many small miniROSs is expensive for Vertica, since there is more merging required. Creating fewer, larger, miniROSs is much more efficient.

    Streaming loads offer the highest overall load rates into Vertica because it allows you create very large miniROS. Simple buffered loads will create a miniROS no larger then the configured buffer size. Simple buffered loads should have two buffers configured so that, while one is busy being written to Vertica, the other can continue to be filled by the StreamBase application. This limits the size of the miniROS that a simple buffered load can create to less then half the available main memory — a few gigabytes at the most.

    In contrast, streaming loads do not create a miniROS until a flush occurs. Flushes can be configured to occur at fixed intervals or forced to occur by sending a flush tuple, or both. So with streaming loads, miniROS size can be made large — many tens of gigabytes, or even larger.

    One advantage of simple buffered loading is that StreamBase holds all the data until the flush has completed successfully. If the Vertica database becomes unreachable for some reason, the flush fails but enters a reconnect and retry loop. Assuming the database becomes reachable again, and tables of interest still exist, the flush subsequently succeeds and no data is lost. With streaming loads, StreamBase discards data as each streaming write completes. If the Vertica database becomes unreachable or terminates unexpectedly after some number of streaming writes, all the data since the last flush may be lost.

    Are you using in 7? Vertica 7 now can process a single load with many threads, so you don't need to care about multi-threading.

  • Thank you for the quick response Daniel!

    Currently developing on Vertica 6..

    So sorry, I should have phrased/simplified my question:

    (A) Can I safely copy stream from multiple connections/sessions to the same Vertica target table via the same Vertica node (say we have three nodes in total) without running into conflicts/lock contentions/etc..?

    Based on your answer this is ok..


    Follow up question:

    (B) If I assigned each loader to a specific Vertica node, and copy stream in parallel this way, is there a performance gain over approach (A), where all the copy streams are hitting the same node?


    Thank you!
  • Hi!

    >> 2. Create a child thread:
    Try connection per input stream.
    https://my.vertica.com/docs/6.1.x/HTML/index.htm#16749.htm
    JDBC Feature Support

    The HP Vertica JDBC driver complies with the JDBC 3 standards, and supports most of its features. You can use the DatabaseMetaData class to determine if the a particular feature you want to use is supported. Also, note the following about the features supported by the HP Vertica JDBC driver:

    • Cursors are forward only and are not scrollable. Result sets cannot be updated.
    • A connection supports a single statement executing at any time. If you want to execute multiple statements simultaneously, open multiple connections.
    • Because HP Vertica does not have stored procedures, CallableStatement is not supported. The DatabaseMetaData.getProcedures() and .getProcedureColumns() methods return information about SQL functions (including UDFs) instead of stored procedures.
    >>If our Vertica setup consists of three nodes (data sources), does that mean I can only have up to three parallel threads doing steps 1-3, each one pinned to a different Vertica node?
    No, because you have to open a connection per stream(no locks for COPY due epochs mechanism, no locks for SELECT, you can safely load data in multiply streams and query database).

    Actually
        MAX(streams per node) = # of cpu cores
    but you are limited by system resources also IO, RAM (you need to leave a some cores for system activity). Once, to some client I saw 100 input streams  and actually it worked(but failed on ROS containers).
    Vertica 7 solves this for you - to get almost a best performance you need only 2-4 streams. Also it reduces a # of connections and ROS containers.

    PS
    Do you think to upgrade? 7 has nice features like load balancing :)
    https://my.vertica.com/docs/7.0.x/HTML/index.htm#Authoring/ProgrammersGuide/ClientJDBC/NewFeaturesIn...

  • read my next comment
    • A connection supports a single statement executing at any time. If you want to execute multiple statements simultaneously, open multiple connections.
    Perfect!  Thank you very much. 

    Off topic:  
    The feature I am missing the most is stored procedure (return result set)..  Need to encapsulate sql logic as reusable building blocks..  Will 7 offer user defined procedures?

  • Hi!

    >> The feature I am missing the most is stored procedure (return result set)
    Forget about it - Vertica has no SP and probably will not offer
    (God bless you Vertica. Actually architects and engineers like M.Stonebraker, D.Abadi, A.Lamb, B.Vandiver and some others)
    Stored procedures are similar to user-defined functions (UDFs). The major difference is that UDFs can be used like any other expression within SQL statements, whereas stored procedures must be invoked using the CALL statement.
    >> Will 7 offer user defined procedures?
    Vertica offers(in 6 too, but no JAVA UDF SDK).
    Vertica offers UDF - user defined function.
    * Vertica 7 supports several types of UDF - scalar, aggregate, analytical, transform and load UDF function.
    * Vertica 7 has UDF SDK for next languages: C/C++, R, Java

    so you can encapsulate your logic in Java

    Developing User Defined Functions in Java
    https://my.vertica.com/docs/7.0.x/HTML/index.htm#Authoring/ProgrammersGuide/UserDefinedFunctions/UDx...

    Developing UDLs in Java
    https://my.vertica.com/docs/7.0.x/HTML/index.htm#Authoring/ProgrammersGuide/UserDefinedFunctions/UDL...

    [UPDATE]
    Also you can encapsulate your logic with External Procedure.
    Disadvantage: EP doesn't return a result set, it just return an exit code (success or fail).
    But its a minor limitation:
    * encapsulate your logic with EP that gets as argument name of tmp table and stores result in this table (create and install EP)
    * inside of JDBC client create tmp table, call for created EP, pass to EP name of tmp table and check for execution status
    * if success so go and process data in your just created tmp table.
    (Vertica automatically will truncate a tmp table on end of session or you can manually to delete a tmp table)

    This method very well emulates a Stored Procedure: you got your results in table that you can query as you wish - like scrollable cursor. You can create a projection for a table, so you can improve data processing.
    What do you need more?

    About Temporary Tables

    https://my.vertica.com/docs/7.0.x/HTML/index.htm#Authoring/AdministratorsGuide/Tables/AboutTemporary...




Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file