Options

How to optimize a merge query?

Hi,
we observe a low performance with merge queries in our database. We merge around 500.000 rows into a table of more than 30 million rows.
Here an example of a log: Merged 441585 rows into table X. Elapsed time: 00:02:26.83
We optimzed our merge query according to Vertica recommendations:

1: Including all columns on the merge query
2: both target and source projections are sorted on the same columns, i.e. primary key columns
3. identically segmented target and source projections

We use the same query on two different databases, one having only one node and the other having three nodes.
The same merge query performs much better on the one-node database. Here an according log:
Merged 540672 rows into table X. Elapsed time: 00:00:31.92

I.e. on the three nodes database the same command takes 5 times longer.
Can you give us a hint on how to improve performance?

Thanks.

Best Answer

  • Options
    marcothesanemarcothesane - Select Field - Administrator
    Answer ✓

    The answer to your question boils down to: "Welcome to MPP!"

    I highlighted the RESEGMENT keyword in your explain plan for a reason. Resegmenting is necessary, for example when you join two tables (and the ON clause requires a JOIN to be performed) and the two tables / result tables are not segmented by the join columns and equally segmented.

    In your case, the optimiser cannot be sure that the combination of eventid 42, event timestamp '2023-12-07-01:15:32' and deviceid 4711 from both the target table and the result table expressed in the USING() clause will be on the same node. So it will resegment one of the tables. The rows will travel from one node to the node where that combination resides in the target table.

    The two main bottlenecks in an MPP database are I/O and network. The query is quicker in a one-noder because there is no network traffic between nodes necessary.

    If you materialise your TOP-K full select like so:

    CREATE TABLE st2_foo LIKE foo INCLUDING PROJECTIONS;
    INSERT INTO st2_foo
    SELECT *
      FROM staging_
      LIMIT 1 OVER (
        PARTITION BY eventid , eventtimestamp , deviceid
        ORDER BY eventid)
    ;
    

    ... and then use st2_foo in the USING clause of your MERGE statement, you should be able to get a JOIN that avoids resegmentation and also could be a MERGEJOIN instead of a HASH join. MERGEJOIN instead of JOIN HASH would also make it quicker on a one-noder.

Answers

  • Options
    joergschaberjoergschaber Vertica Customer

    Here comes the query plan:


    QUERY PLAN DESCRIPTION:

    EXPLAIN MERGE INTO USING (SELECT * FROM staging_ LIMIT 1 OVER (PARTITION BY EventId,EventTimeStamp,DeviceId ORDER BY EventId)) t2 ON t2.EventId = .EventId AND t2.EventTimeStamp = .EventTimeStamp AND t2.DeviceId = .DeviceId WHEN MATCHED THEN UPDATE SET Category = t2.Category, DeviceId = t2.DeviceId, IsEntrySentToDms = t2.IsEntrySentToDms, IsEntrySentToRsg = t2.IsEntrySentToRsg, EventId = t2.EventId, EventTimeStamp = t2.EventTimeStamp, EventText = t2.EventText, TransferTimestamp = t2.TransferTimestamp, ImportTimestamp = t2.ImportTimestamp, GuiDescription = t2.GuiDescription, GuiTitle = t2.GuiTitle, Level1ErrorId = t2.Level1ErrorId, Level1ErrorMessage = t2.Level1ErrorMessage, Level2ErrorId = t2.Level2ErrorId, OperatorId = t2.OperatorId, Rudi = t2.Rudi, Severity = t2.Severity, CreationTimestamp = t2.CreationTimestamp, EventTimeStampString = t2.EventTimeStampString, EventTimeStampUtc = t2.EventTimeStampUtc, CreationTimestampString = t2.CreationTimestampString, IsFromOfflineSource = t2.IsFromOfflineSource WHEN NOT MATCHED THEN INSERT (Category, DeviceId, IsEntrySentToDms, IsEntrySentToRsg, EventId, EventTimeStamp, EventText, TransferTimestamp, ImportTimestamp, GuiDescription, GuiTitle, Level1ErrorId, Level1ErrorMessage, Level2ErrorId, OperatorId, Rudi, Severity, CreationTimestamp, EventTimeStampString, EventTimeStampUtc, CreationTimestampString, IsFromOfflineSource) VALUES (t2.Category, t2.DeviceId, t2.IsEntrySentToDms, t2.IsEntrySentToRsg, t2.EventId, t2.EventTimeStamp, t2.EventText, t2.TransferTimestamp, t2.ImportTimestamp, t2.GuiDescription, t2.GuiTitle, t2.Level1ErrorId, t2.Level1ErrorMessage, t2.Level2ErrorId, t2.OperatorId, t2.Rudi, t2.Severity, t2.CreationTimestamp, t2.EventTimeStampString, t2.EventTimeStampUtc, t2.CreationTimestampString, t2.IsFromOfflineSource)

    Access Path:
    +-DML MERGE [Cost: 0, Rows: 0]
    | Target Projection: _DBD_3_seg_InitalDesign_b1
    | Target Projection: _DBD_3_seg_InitalDesign_b0
    | Target Projection: _DBD_2_seg_InitalDesign_b1
    | Target Projection: _DBD_2_seg_InitalDesign_b0
    | Target Projection: _DBD_1_seg_InitalDesign_b1
    | Target Projection: _DBD_1_seg_InitalDesign_b0
    | Target Projection: _DBD_136_seg_InitalDesign_b1
    | Target Projection: _DBD_136_seg_InitalDesign_b0
    | Target Projection: _DBD_135_seg_InitalDesign_b1
    | Target Projection: _DBD_135_seg_InitalDesign_b0
    | Target Projection: _DBD_129_seg_InitalDesign_b1
    | Target Projection: _DBD_129_seg_InitalDesign_b0
    | Target Projection: _DBD_121_seg_InitalDesign_b1
    | Target Projection: _DBD_121_seg_InitalDesign_b0
    | Target Prep:
    | Execute on: All Nodes
    | +---> JOIN HASH [RightOuter] [Cost: 13M, Rows: 1 (NO STATISTICS)] (PATH ID: 1) Inner (RESEGMENT)
    | | Join Cond: (VAL(2) =

    <

    table>.EventId) AND (VAL(2) =

    <

    table>.EventTimeStamp) AND (VAL(2) =

    <

    table>.DeviceId)
    | | Execute on: All Nodes
    | | +-- Outer -> STORAGE ACCESS for [Cost: 9M, Rows: 30M (NO STATISTICS)] (PATH ID: 2)
    | | | Projection: _DBD_1_seg_InitalDesign_b0
    | | | Materialize:

    <

    table>.DeviceId,

    <

    table>.Category,

    <

    table>.Level2ErrorId,

    <

    table>.IsFromOfflineSource,

    <

    table>.IsEntrySentToRsg,

    <

    table>.IsEntrySentToDms,

    <

    table>.Severity,

    <

    table>.Level1ErrorMessage,

    <

    table>.Level1ErrorId,

    <

    table>.EventText,

    <

    table>.GuiTitle,

    <

    table>.GuiDescription,

    <

    table>.Rudi,

    <

    table>.OperatorId,

    <

    table>.EventId,

    <

    table>.CreationTimestampString,

    <

    table>.ImportTimestamp,

    <

    table>.TransferTimestamp,

    <

    table>.CreationTimestamp,

    <

    table>.EventTimeStampUtc,

    <

    table>.EventTimeStampString,

    <

    table>.EventTimeStamp,

    <

    table>.epoch
    | | | Execute on: All Nodes
    | | | Runtime Filters: (SIP1(HashJoin):

    <

    table>.EventId), (SIP2(HashJoin):

    <

    table>.EventTimeStamp), (SIP3(HashJoin):

    <

    table>.DeviceId), (SIP4(HashJoin):

    <

    table>.EventId,

    <

    table>.EventTimeStamp,

    <

    table>.DeviceId)
    | | +-- Inner -> SELECT [Cost: 9, Rows: 1] (PATH ID: 3)
    | | | Execute on: All Nodes
    | | | +---> SELECT LIMIT 1 [Cost: 9, Rows: 1] (PATH ID: 4)
    | | | | Execute on: All Nodes
    | | | | +---> ANALYTICAL [Cost: 9, Rows: 1] (PATH ID: 5)
    | | | | | Analytic Group
    | | | | | Functions: row_number()
    | | | | | Group Local Resegment:

    <

    table>.DeviceId,

    <

    table>.EventId,

    <

    table>.EventTimeStamp
    | | | | | Group Sort:

    <

    table>.DeviceId ASC,

    <

    table>.EventId ASC,

    <

    table>.EventTimeStamp ASC,

    <

    table>.EventId ASC NULLS LAST
    | | | | | TopK Optimized: K=1, PB Cols = 3
    | | | | | Output Only: 1 tuples
    | | | | | Execute on: All Nodes
    | | | | | +---> STORAGE ACCESS for

    <

    table> [Cost: 8, Rows: 1] (PATH ID: 6)
    | | | | | | Projection: staging__super_new_b1
    | | | | | | Materialize:

    <

    table>.DeviceId,

    <

    table>.EventId,

    <

    table>.Category,

    <

    table>.IsEntrySentToDms,

    <

    table>.IsEntrySentToRsg,

    <

    table>.EventTimeStamp,

    <

    table>.EventText,

    <

    table>.TransferTimestamp,

    <

    table>.ImportTimestamp,

    <

    table>.GuiDescription,

    <

    table>.GuiTitle,

    <

    table>.Level1ErrorId,

    <

    table>.Level1ErrorMessage,

    <

    table>.Level2ErrorId,

    <

    table>.OperatorId,

    <

    table>.Rudi,

    <

    table>.Severity,

    <

    table>.CreationTimestamp,

    <

    table>.EventTimeStampString,

    <

    table>.EventTimeStampUtc,

    <

    table>.CreationTimestampString,

    <

    table>.IsFromOfflineSource
    | | | | | | Execute on: All Nodes

  • Options
    VValdarVValdar Vertica Employee Employee
    edited December 2023

    Hi Joerg,

    We didn't had the opportunity to interact since Malta :) Hopefully I can provide some help here.

    What I see in the explain plan (thanks for providing it), it you have a lot of projections on your final table (dbd_1, dbd_2, dbd_3, dbd_121, dbd_129, dbd_135, dbd_136), the merge has to happen in all those projections.

    For this explain plan we see it on only dbd_1, I guess the real one is much longer with all projections.

    Anyhow, the TOP K needs to resegment and reorder the data.
    So even if the staging table matches, after the TOP K is computed it no longer matches the table structure.
    We see this as the final join is a HASH JOIN and not a MERGE JOIN.

    Btw, this TOP K seems a bit surprising to me, I feel there is a bug in the way it's coded :) it it partitoned by EventId, EventTimeStamp and DeviceId, then sorted on EventId. As it's already in the partition by, you'll get nothing from this order.

    Usually we see more something like LIMIT 1 OVER (PARTITION BY DeviceId ORDER BY EventTimeStamp desc) to update only the last value (maybe it has no meaning with your data).

    So my advices:
    1. challenge all your existing projections, the less the better for the merge.
    2. challenge the TOP K
    3. create a working table with the result of the TOPK query, following all the advices you mentioned and MERGE from there.

    Edit: also, NO STATISTICS is not ideal.

  • Options
    marcothesanemarcothesane - Select Field - Administrator

    The culprit is very probably this one:
    +---> JOIN HASH [RightOuter] [Cost: 13M, Rows: 1 (NO STATISTICS)] (PATH ID: 1) Inner (RESEGMENT)
    .. and the hurting part is RESEGMENT.
    The way the rows come out from the query in the of the full-select in the USING() clause, they are very probably not segmented - and not sorted - like any of the projections you have for your merge target table.

    Three possible workarounds.
    1. Add a projection to your target table segmented - and sorted - like the rows would come out of that full-select - a CREATE TABLE AS from that full select , followed by a SELECT EXPORT_OBJECTS('','<that CTAS target>, FALSE)will inspire you on how to create that projection. 2. Create a staging table:CREATE TABLE your_table_stg LIKE your_table INCLUDING PROJECTIONS;, then,INSERT INTO your_table_stg ; then, code aMERGE USING(your_table_stg>). That will avoid resegmentations. 3. Work with a staging table and partition both tables by the element of time. Fill the staging table with all data as it should be at the end. Finally,SELECT SWAP_PARTITIONS_BETWEEN_TABLES();` if you're interested in that, I can supply details.

  • Options
    joergschaberjoergschaber Vertica Customer

    Hi Marco and Valdar,

    thanks for your replies!
    First thing I will try it omitting the full select query and directly merge on the staging table instead. The full select is not really necessary and seems to be problematic. We introduced to first get rid of duplcate keys and then merge,

    However, what puzzels me is that on both databases the number and kind of projections is basically the same. However, on the 1-node cluster the merge is considerably faster than on the 3-node cluster.

    @Valdar: what do you mean by 'challenge the TOP K'.

  • Options
    joergschaberjoergschaber Vertica Customer

    When I look at the qery plan completey omitting the Select query, it is drastically reduced, but resegments all parititions (see below).
    So reducing the number of parition does also sound like reasonable measure to reduce run time.

    When I run the databse designer again, does it drop unneccessary projections? I seem to have some ducplicate projections, i.e. with identical definitions.
    Or is there a clever way to drop duplicate projections?

    EXPLAIN MERGE INTO cobasPulseStage.AuditTrails USING staging_cobasPulseStage.AuditTrails t2 ON t2.EventId = cobasPulseStage.AuditTrails.EventId AND t2.EventTimeStamp = cobasPulseStage.AuditTrails.EventTimeStamp AND t2.DeviceId = cobasPulseStage.AuditTrails.DeviceId WHEN MATCHED THEN UPDATE SET Category = t2.Category, DeviceId = t2.DeviceId, IsEntrySentToDms = t2.IsEntrySentToDms, IsEntrySentToRsg = t2.IsEntrySentToRsg, EventId = t2.EventId, EventTimeStamp = t2.EventTimeStamp, EventText = t2.EventText, TransferTimestamp = t2.TransferTimestamp, ImportTimestamp = t2.ImportTimestamp, GuiDescription = t2.GuiDescription, GuiTitle = t2.GuiTitle, Level1ErrorId = t2.Level1ErrorId, Level1ErrorMessage = t2.Level1ErrorMessage, Level2ErrorId = t2.Level2ErrorId, OperatorId = t2.OperatorId, Rudi = t2.Rudi, Severity = t2.Severity, CreationTimestamp = t2.CreationTimestamp, EventTimeStampString = t2.EventTimeStampString, EventTimeStampUtc = t2.EventTimeStampUtc, CreationTimestampString = t2.CreationTimestampString, IsFromOfflineSource = t2.IsFromOfflineSource WHEN NOT MATCHED THEN INSERT (Category, DeviceId, IsEntrySentToDms, IsEntrySentToRsg, EventId, EventTimeStamp, EventText, TransferTimestamp, ImportTimestamp, GuiDescription, GuiTitle, Level1ErrorId, Level1ErrorMessage, Level2ErrorId, OperatorId, Rudi, Severity, CreationTimestamp, EventTimeStampString, EventTimeStampUtc, CreationTimestampString, IsFromOfflineSource) VALUES (t2.Category, t2.DeviceId, t2.IsEntrySentToDms, t2.IsEntrySentToRsg, t2.EventId, t2.EventTimeStamp, t2.EventText, t2.TransferTimestamp, t2.ImportTimestamp, t2.GuiDescription, t2.GuiTitle, t2.Level1ErrorId, t2.Level1ErrorMessage, t2.Level2ErrorId, t2.OperatorId, t2.Rudi, t2.Severity, t2.CreationTimestamp, t2.EventTimeStampString, t2.EventTimeStampUtc, t2.CreationTimestampString, t2.IsFromOfflineSource)

    Access Path:
    +-DML INSERT [Cost: 0, Rows: 0]
    | Target Projection: cobasPulseStage.AuditTrails_DBD_3_seg_InitalDesign_b1 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_3_seg_InitalDesign_b0 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_2_seg_InitalDesign_b1 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_2_seg_InitalDesign_b0 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_1_seg_InitalDesign_b1 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_1_seg_InitalDesign_b0 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_136_seg_InitalDesign_b1 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_136_seg_InitalDesign_b0 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_135_seg_InitalDesign_b1 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_135_seg_InitalDesign_b0 (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_129_seg_InitalDesign_b1
    | Target Projection: cobasPulseStage.AuditTrails_DBD_129_seg_InitalDesign_b0 (RESEGMENT)
    | Target Projection: cobasPulseStage.AuditTrails_DBD_121_seg_InitalDesign_b1
    | Target Projection: cobasPulseStage.AuditTrails_DBD_121_seg_InitalDesign_b0 (RESEGMENT)
    | Target Prep:
    | Execute on: All Nodes
    | Execute on: All Nodes
    | Execute on: All Nodes
    | +---> STORAGE ACCESS for t2 [Cost: 8, Rows: 1] (PATH ID: 3)
    | | Projection: staging_cobasPulseStage.AuditTrails_super_new_b1
    | | Materialize: t2.DeviceId, t2.EventId, t2.Category, t2.IsEntrySentToDms, t2.IsEntrySentToRsg, t2.EventTimeStamp, t2.EventText, t2.TransferTimestamp, t2.ImportTimestamp, t2.GuiDescription, t2.GuiTitle, t2.Level1ErrorId, t2.Level1ErrorMessage, t2.Level2ErrorId, t2.OperatorId, t2.Rudi, t2.Severity, t2.CreationTimestamp, t2.EventTimeStampString, t2.EventTimeStampUtc, t2.CreationTimestampString, t2.IsFromOfflineSource
    | | Execute on: All Nodes

  • Options
    joergschaberjoergschaber Vertica Customer

    To follow up:
    After all, we needed the statement
    " SELECT * FROM staging_ LIMIT 1 OVER (PARTITION BY EventId,EventTimeStamp,DeviceId ORDER BY EventId"
    to avoid duplicate merge key errors.
    However, that drasticaly redcied performance.
    The solution, as above suggested by Marco (THANKS!), was to create a new temporary or staging table

    CREATE TABLE st2_foo LIKE foo INCLUDING PROJECTIONS;
    INSERT INTO st2_foo
    SELECT *
    FROM staging_
    LIMIT 1 OVER (
    PARTITION BY eventid , eventtimestamp , deviceid
    ORDER BY eventid);

    and then use this in the merge query: MERGE INTO USING st2_foo ....

    The performance improved more than 100-fold!

  • Options
    VValdarVValdar Vertica Employee Employee

    Congrats :) always nice to see huge improvement!

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file