Fast bulkload for flex tables

rajatpaliwal86rajatpaliwal86 Vertica Customer

we have a primary table designed as a flex table. We are looking for a fastest way to load data in it. Currently, we store all upcoming events to a local disk JSON file, and then every 2 minutes we roll this file and execute copy local command using fjsonparser. We observed that a file with around 500000 (half million) events takes around 200 seconds which looks a bit slower. I know that JSON parser is quite slower than the CSV parser used for the normal table. We are looking for the best practices to speed upload:
1. JSON parser can be optimized?
2. copy would be better than copy local? (if we choose to copy the file to a shared file system and execute copy on the vertica server).
3. Does bulkload need a specific resource pool and which parameters of the resource pool can impact the performance?
4. Copying a few large files vs copy more small files - which is better?
5. How many huge copy load commands can Vertica execute in parallel?

Best Answer

Answers

  • moshegmosheg Vertica Employee Administrator
    edited May 2020

    It is advised to send questions to the forum in the "General Discussion" category: https://forum.vertica.com/categories/general-discussion
    A. From Vertica 8.0 and forward with apportioned load, if all the nodes have access to the source data, Phase I
    as described here: https://www.vertica.com/kb/Data-Loading-in-Vertica-Using-COPY/Content/BestPractices/Data-Loading-in-Vertica-Using-COPY.htm
    occurs on several nodes. An apportioned load is a divisible load, such that you can load a single data file on more than one node.
    If the apportioned load is not available, Phase I occurs only on the nodes that read the file which will be much slower.
    B. The load will be faster, as more nodes participate in the work.
    C. In addition, using a wildcard with the ON ANY NODE clause expands the file list on the initiator node. This will distribute the individual files among all nodes, so that the COPY workload is evenly distributed across the cluster.
    D. About the Resource Pool question, yes, the RP parameters and configuration parameters as explained in the article above affect the performance of data load.
    E. One of the reasons for a slow "materialized table" load is the amount of columns in the projection ORDER BY clause. In other cases when relevant try to recreate your schema with NO “SEGMENTED BY” clause and with only one INT field in the ORDER BY clause.
    F. Measure the time it takes to run 2 separated COPY commands in parallel, each load different files.
    G. “ON ANY NODE” is the default option for the COPY command when loading from HDFS or S3 paths and does not need to be specified. However, the following conditions must be met to ensure parallel load:
    1. The source files to load should be available on all nodes, so COPY opens the file and parses it from any node in the cluster.
    2. The file must be the same on all nodes. If the file differs on two nodes, an incorrect or incomplete result is returned, with no error or warning.
    3. Vertica attempts to apportion the load among several nodes if the file is large enough to benefit from apportioning.
    4. If ERROR TOLERANCE or REJECTMAX is specified, Vertica chooses to load from a single node.
    5. Use a wildcard (such as *.dat) to load multiple input files, combined with the ON ANY NODE clause.
    6. “ON ANY NODE” is invalid with STDIN and LOCAL: STDIN can only use the initiator node, and LOCAL indicates a client node.
    H. Monitor the load process in MC and with queries like:
    select * from dc_load_events where transaction_id=XYZ and statement_id=XYZ order by time;
    To get the transaction_id and statement_id run first
    select * from load_streams where is_executing;
    Use the STREAM NAME parameter as part of the COPY statement so it will be easier to identify it in the LOAD_STREAMS system table.
    Check how your schema is defined: SELECT EXPORT_OBJECTS('','SCHEMANAME.TABLENAME',true);

  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg thanks for the reply - I'll try out the things you mentioned and will see how it affects the performance.
    From next time I'll keep such discussions under "General discussion" category as you mentioned :)

  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg fjsonparser() seems to be very slow, can we use FCSVPARSER for loading data into hybrid flex table?

  • moshegmosheg Vertica Employee Administrator

    You can load to any table as long as the source file format is supported by the parser.
    The time it takes to load CSV files is usually faster then JSON files.

  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg said:
    You can load to any table as long as the source file format is supported by the parser.
    The time it takes to load CSV files is usually faster then JSON files.

    The events are very sparsed - few events have one set of attributes and other events have another set of attributes. The common attributes would be around 50-60. The total list of attributes/columns could be around 1000 and so. Will it affect the performance?
    Can you give me an example of how can I port from JSON events to CSV events?

  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg Also we have a text index on the flex table - how does it impact the load performance?

  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg
    I saw this interesting article about pre segmentation
    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/ConnectingToVertica/ClientJDBC/KVApi/VHash.htm?tocpath=Connecting to Vertica|Client Libraries|Programming JDBC Client Applications|Routing JDBC Queries Directly to a Single Node|_____6

    I have a table that is segmented by primary key - event_id. Can I create pre-segmented files for each node say node1.json, node2.json, and node3.json (assuming 3 nodes cluster) - will it improve load performance since Vertica nodes do not have to shuffle ingested data?
    In the example, it has used some projection to get the segmentationMetadata which is used later. Can I use any projection here since all my projections are segmented the same?

    In the copy command, I can specify file and node like
    copy foo from 'node1.json' ON v_athenadb_node0001, 'node2.json' ON v_athenadb_node0002, 'node3.json' ON v_athenadb_node0003 PARSER fjsonparser() DIRECT

    Is my assumption correct?

  • moshegmosheg Vertica Employee Administrator

    Theoretically yes but it will not reduce the time significantly.
    Try something like the following example using a unique stream name to monitor the load and see where time is spent.

    profile COPY online_sales.Online_Sales_Fact
            FROM '/opt/vertica/examples/VMart_Schema/Online_Sales_Fact.tbl'
            DELIMITER '|' NULL '' DIRECT
            STREAM NAME 'MY_COPY_01';
    
    \set TRAN_ID `vsql -XAtqc "SELECT transaction_id FROM load_streams where stream_name= 'MY_COPY_01' ;"`
    \set STAT_ID `vsql -XAtqc "SELECT statement_id FROM load_streams where stream_name= 'MY_COPY_01' ;"`
    -----------------------------------------------------------------------------------------------------
    \x
    SELECT * FROM load_streams where transaction_id = :TRAN_ID AND statement_id = :STAT_ID ;
    \x
    \echo 'Find out which execution engine operator took the longest to execute:'
    SELECT
        operator_name,
        AVG(counter_value) as "Average Time Per Thread"
    FROM dc_execution_engine_profiles
    WHERE counter_name ILIKE '%execution time%'
        AND transaction_id = :TRAN_ID
        AND statement_id = :STAT_ID
    GROUP BY 1
    ORDER BY "Average Time Per Thread" DESC;
    
    \echo 'Exec_time and I/O by node:'
    SELECT
        node_name, path_id, activity,
        TIMESTAMPDIFF( us , start_time, end_time) AS elaps_us,
        execution_time_us AS exec_us,
        CASE WHEN associated_oid IS NOT NULL THEN description ELSE NULL END AS input
    FROM
        v_internal.dc_plan_activities
    WHERE
        transaction_id = :TRAN_ID
        AND statement_id = :STAT_ID
    ORDER BY elaps_us DESC;
    
  • rajatpaliwal86rajatpaliwal86 Vertica Customer

    @mosheg Thanks I will try out it.

    "When using this COPY parameter on any node, confirm that the source file is identical on all nodes. Using different files can produce inconsistent results."
    For apportioned load -
    The file can be loaded to a shared file location something like NFS share and will be accessible to all nodes on the same path?
    Or
    We can copy the file to all nodes individually and execute copy command then?

    which method is more preferable?

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file