We're Moving!

The Vertica Forum is moving to a new OpenText Analytics Database (Vertica) Community.

Join us there to post discussion topics, learn about

product releases, share tips, access the blog, and much more.

Create My New Community Account Now


Controlling data segmentation — Vertica Forum

Controlling data segmentation

I have a transform UDF that I want to run per zip_prefix (first 2 digits of zipcode). The problem is that the data distribution per this zip prefix is quite skewed. So I'd like to evenly distribute subsets of zip_prefixes to each node and run the UDF on each node (e.g., partition nodes order by zip_prefix). In this case, my UDF would handle splitting up the data by zip_prefix as it streams in to the UDF.

The problem is I do not see how I can control the segmentation in such a manner.

For instance, you can see if I just segment by hash(zip_prefix), the amount of data processed per node will be uneven:

create local temp table t on commit preserve rows as
select zip_prefix, 1 matrix, site_idx, max(site_idx) over (partition by zip_prefix) site_len, aid_idx, max(aid_idx) over ( partition by zip_prefix) aid_len, weight
  from ci_dm.ul_site_edge
  where batch_id = 1
  union all
  select zip_prefix, 2 matrix, aid1_idx, 0, aid2_idx, 0, weight
  from ci_dm.ul_aid_edge
  where batch_id = 1
  order by zip_prefix, matrix  
segmented by hash(zip_prefix) all nodes
;

dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't_b0' group by 1 order by 1;

   node_name    | row_distribution_perc
----------------+-----------------------
 v_udb_node0001 |                  1.43
 v_udb_node0002 |                  3.62
 v_udb_node0003 |                  4.16
 v_udb_node0004 |                  8.24
 v_udb_node0005 |                  9.61
 v_udb_node0006 |                  0.24
 v_udb_node0007 |                  7.90
 v_udb_node0008 |                  0.68
 v_udb_node0009 |                  1.78
 v_udb_node0010 |                  5.22
 v_udb_node0011 |                  4.30
 v_udb_node0012 |                  6.40
 v_udb_node0013 |                  0.50
 v_udb_node0014 |                  5.23
 v_udb_node0015 |                  5.21
 v_udb_node0016 |                  2.31
 v_udb_node0017 |                  5.52
 v_udb_node0018 |                  2.19
 v_udb_node0019 |                  5.83
 v_udb_node0020 |                  4.66
 v_udb_node0021 |                  3.34
 v_udb_node0022 |                  5.10
 v_udb_node0023 |                  2.65
 v_udb_node0024 |                  3.87


On a 24-node cluster, I can figure out a decent way to create subsets of zip_prefixes and assign them to nodes:

dbadmin=> select node_id, count(distinct zip_prefix) zips, (sum(cnt)/sum(sum(cnt)) over()*100)::numeric(10,2) row_distribution_perc from ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) v group by 1order by 1;
 node_id | zips | row_distribution_perc
---------+------+-----------------------
       0 |   16 |                  5.99
       1 |   16 |                  5.38
       2 |   16 |                  5.26
       3 |   16 |                  5.11
       4 |   16 |                  5.04
       5 |   15 |                  4.93
       6 |   15 |                  4.81
       7 |   15 |                  4.74
       8 |   15 |                  4.61
       9 |   15 |                  4.15
      10 |   15 |                  4.09
      11 |   15 |                  3.94
      12 |   15 |                  3.89
      13 |   15 |                  3.80
      14 |   15 |                  3.69
      15 |   15 |                  3.67
      16 |   15 |                  3.62
      17 |   15 |                  3.56
      18 |   15 |                  3.51
      19 |   15 |                  3.44
      20 |   15 |                  3.37
      21 |   15 |                  3.24
      22 |   15 |                  3.13
      23 |   15 |                  3.03

However, I cannot get the data to distribute in the cluster based on node_id:

dbadmin=> create local temp table t2 on commit preserve rows as
dbadmin-> select t.*, n.node_id
dbadmin-> from t, ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) n
dbadmin-> where n.zip_prefix = t.zip_prefix
dbadmin-> order by node_id, zip_prefix, matrix
dbadmin-> segmented by node_id all nodes
dbadmin-> ;
WARNING 5993:  Projection is irregularly segmented by column
HINT:  Consider using a segmentation expression, such as SEGMENTED BY HASH(column)

CREATE TABLE
dbadmin=>
dbadmin=>
dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't2_b0' group by 1 order by 1;
   node_name    | row_distribution_perc
----------------+-----------------------
 v_udb_node0001 |                100.00
 v_udb_node0002 |                  0.00
 v_udb_node0003 |                  0.00
 v_udb_node0004 |                  0.00
 v_udb_node0005 |                  0.00
 v_udb_node0006 |                  0.00
 v_udb_node0007 |                  0.00
 v_udb_node0008 |                  0.00
 v_udb_node0009 |                  0.00
 v_udb_node0010 |                  0.00
 v_udb_node0011 |                  0.00
 v_udb_node0012 |                  0.00
 v_udb_node0013 |                  0.00
 v_udb_node0014 |                  0.00
 v_udb_node0015 |                  0.00
 v_udb_node0016 |                  0.00
 v_udb_node0017 |                  0.00
 v_udb_node0018 |                  0.00
 v_udb_node0019 |                  0.00
 v_udb_node0020 |                  0.00
 v_udb_node0021 |                  0.00
 v_udb_node0022 |                  0.00
 v_udb_node0023 |                  0.00
 v_udb_node0024 |                  0.00

Is there a segmentation clause of some other way that I can distribute the data this way?

Comments

  • My current solution is to actually break up the data into multiple tables and create each table on a specific node. Then run a union all query on top of these tables. Seems to be working...

     

    dbadmin=> select 'create local temp table tx_'||node_id||' on commit preserve rows as select * from t2 where node_id = '||node_id||' order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes '||prime||','||buddy||';' from ( select n1.node_id, n1.node_name prime, n2.node_name buddy from ( select node_name, row_number() over ( order by node_name )-1 node_id from nodes ) n1, ( select node_name, row_number() over ( order by node_name  desc )-1 node_id from nodes ) n2 where n1.node_id = n2.node_id ) v order by node_id;
                                                                                                 ?column?                                                                                              
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     create local temp table tx_0 on commit preserve rows as select * from t2 where node_id = 0 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0001,v_udb_node0024;
     create local temp table tx_1 on commit preserve rows as select * from t2 where node_id = 1 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0002,v_udb_node0023;
     create local temp table tx_2 on commit preserve rows as select * from t2 where node_id = 2 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0003,v_udb_node0022;
     create local temp table tx_3 on commit preserve rows as select * from t2 where node_id = 3 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0004,v_udb_node0021;
     create local temp table tx_4 on commit preserve rows as select * from t2 where node_id = 4 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0005,v_udb_node0020;
     create local temp table tx_5 on commit preserve rows as select * from t2 where node_id = 5 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0006,v_udb_node0019;
     create local temp table tx_6 on commit preserve rows as select * from t2 where node_id = 6 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0007,v_udb_node0018;
     create local temp table tx_7 on commit preserve rows as select * from t2 where node_id = 7 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0008,v_udb_node0017;
     create local temp table tx_8 on commit preserve rows as select * from t2 where node_id = 8 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0009,v_udb_node0016;
     create local temp table tx_9 on commit preserve rows as select * from t2 where node_id = 9 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0010,v_udb_node0015;
     create local temp table tx_10 on commit preserve rows as select * from t2 where node_id = 10 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0011,v_udb_node0014;
     create local temp table tx_11 on commit preserve rows as select * from t2 where node_id = 11 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0012,v_udb_node0013;
     create local temp table tx_12 on commit preserve rows as select * from t2 where node_id = 12 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0013,v_udb_node0012;
     create local temp table tx_13 on commit preserve rows as select * from t2 where node_id = 13 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0014,v_udb_node0011;
     create local temp table tx_14 on commit preserve rows as select * from t2 where node_id = 14 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0015,v_udb_node0010;
     create local temp table tx_15 on commit preserve rows as select * from t2 where node_id = 15 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0016,v_udb_node0009;
     create local temp table tx_16 on commit preserve rows as select * from t2 where node_id = 16 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0017,v_udb_node0008;
     create local temp table tx_17 on commit preserve rows as select * from t2 where node_id = 17 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0018,v_udb_node0007;
     create local temp table tx_18 on commit preserve rows as select * from t2 where node_id = 18 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0019,v_udb_node0006;
     create local temp table tx_19 on commit preserve rows as select * from t2 where node_id = 19 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0020,v_udb_node0005;
     create local temp table tx_20 on commit preserve rows as select * from t2 where node_id = 20 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0021,v_udb_node0004;
     create local temp table tx_21 on commit preserve rows as select * from t2 where node_id = 21 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0022,v_udb_node0003;
     create local temp table tx_22 on commit preserve rows as select * from t2 where node_id = 22 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0023,v_udb_node0002;
     create local temp table tx_23 on commit preserve rows as select * from t2 where node_id = 23 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0024,v_udb_node0001;
    (24 rows)

     

    dbadmin=> create table ul_result
    dbadmin-> as
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by matrix )
    dbadmin-> from tx_0
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_1
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_2
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_3
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_4
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_5
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_6
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_7
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_8
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_9
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_10
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_11
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_12
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_13
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_14
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_15
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_16
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_17
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_18
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_19
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_20
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_21
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_22
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_23
    dbadmin-> order by zip_prefix, matrix_id segmented by hash(zip_prefix, d1, d2, d3) all nodes;

     

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file