Controlling data segmentation

I have a transform UDF that I want to run per zip_prefix (first 2 digits of zipcode). The problem is that the data distribution per this zip prefix is quite skewed. So I'd like to evenly distribute subsets of zip_prefixes to each node and run the UDF on each node (e.g., partition nodes order by zip_prefix). In this case, my UDF would handle splitting up the data by zip_prefix as it streams in to the UDF.

The problem is I do not see how I can control the segmentation in such a manner.

For instance, you can see if I just segment by hash(zip_prefix), the amount of data processed per node will be uneven:

create local temp table t on commit preserve rows as
select zip_prefix, 1 matrix, site_idx, max(site_idx) over (partition by zip_prefix) site_len, aid_idx, max(aid_idx) over ( partition by zip_prefix) aid_len, weight
  from ci_dm.ul_site_edge
  where batch_id = 1
  union all
  select zip_prefix, 2 matrix, aid1_idx, 0, aid2_idx, 0, weight
  from ci_dm.ul_aid_edge
  where batch_id = 1
  order by zip_prefix, matrix  
segmented by hash(zip_prefix) all nodes
;

dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't_b0' group by 1 order by 1;

   node_name    | row_distribution_perc
----------------+-----------------------
 v_udb_node0001 |                  1.43
 v_udb_node0002 |                  3.62
 v_udb_node0003 |                  4.16
 v_udb_node0004 |                  8.24
 v_udb_node0005 |                  9.61
 v_udb_node0006 |                  0.24
 v_udb_node0007 |                  7.90
 v_udb_node0008 |                  0.68
 v_udb_node0009 |                  1.78
 v_udb_node0010 |                  5.22
 v_udb_node0011 |                  4.30
 v_udb_node0012 |                  6.40
 v_udb_node0013 |                  0.50
 v_udb_node0014 |                  5.23
 v_udb_node0015 |                  5.21
 v_udb_node0016 |                  2.31
 v_udb_node0017 |                  5.52
 v_udb_node0018 |                  2.19
 v_udb_node0019 |                  5.83
 v_udb_node0020 |                  4.66
 v_udb_node0021 |                  3.34
 v_udb_node0022 |                  5.10
 v_udb_node0023 |                  2.65
 v_udb_node0024 |                  3.87


On a 24-node cluster, I can figure out a decent way to create subsets of zip_prefixes and assign them to nodes:

dbadmin=> select node_id, count(distinct zip_prefix) zips, (sum(cnt)/sum(sum(cnt)) over()*100)::numeric(10,2) row_distribution_perc from ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) v group by 1order by 1;
 node_id | zips | row_distribution_perc
---------+------+-----------------------
       0 |   16 |                  5.99
       1 |   16 |                  5.38
       2 |   16 |                  5.26
       3 |   16 |                  5.11
       4 |   16 |                  5.04
       5 |   15 |                  4.93
       6 |   15 |                  4.81
       7 |   15 |                  4.74
       8 |   15 |                  4.61
       9 |   15 |                  4.15
      10 |   15 |                  4.09
      11 |   15 |                  3.94
      12 |   15 |                  3.89
      13 |   15 |                  3.80
      14 |   15 |                  3.69
      15 |   15 |                  3.67
      16 |   15 |                  3.62
      17 |   15 |                  3.56
      18 |   15 |                  3.51
      19 |   15 |                  3.44
      20 |   15 |                  3.37
      21 |   15 |                  3.24
      22 |   15 |                  3.13
      23 |   15 |                  3.03

However, I cannot get the data to distribute in the cluster based on node_id:

dbadmin=> create local temp table t2 on commit preserve rows as
dbadmin-> select t.*, n.node_id
dbadmin-> from t, ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) n
dbadmin-> where n.zip_prefix = t.zip_prefix
dbadmin-> order by node_id, zip_prefix, matrix
dbadmin-> segmented by node_id all nodes
dbadmin-> ;
WARNING 5993:  Projection is irregularly segmented by column
HINT:  Consider using a segmentation expression, such as SEGMENTED BY HASH(column)

CREATE TABLE
dbadmin=>
dbadmin=>
dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't2_b0' group by 1 order by 1;
   node_name    | row_distribution_perc
----------------+-----------------------
 v_udb_node0001 |                100.00
 v_udb_node0002 |                  0.00
 v_udb_node0003 |                  0.00
 v_udb_node0004 |                  0.00
 v_udb_node0005 |                  0.00
 v_udb_node0006 |                  0.00
 v_udb_node0007 |                  0.00
 v_udb_node0008 |                  0.00
 v_udb_node0009 |                  0.00
 v_udb_node0010 |                  0.00
 v_udb_node0011 |                  0.00
 v_udb_node0012 |                  0.00
 v_udb_node0013 |                  0.00
 v_udb_node0014 |                  0.00
 v_udb_node0015 |                  0.00
 v_udb_node0016 |                  0.00
 v_udb_node0017 |                  0.00
 v_udb_node0018 |                  0.00
 v_udb_node0019 |                  0.00
 v_udb_node0020 |                  0.00
 v_udb_node0021 |                  0.00
 v_udb_node0022 |                  0.00
 v_udb_node0023 |                  0.00
 v_udb_node0024 |                  0.00

Is there a segmentation clause of some other way that I can distribute the data this way?

Comments

  • My current solution is to actually break up the data into multiple tables and create each table on a specific node. Then run a union all query on top of these tables. Seems to be working...

     

    dbadmin=> select 'create local temp table tx_'||node_id||' on commit preserve rows as select * from t2 where node_id = '||node_id||' order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes '||prime||','||buddy||';' from ( select n1.node_id, n1.node_name prime, n2.node_name buddy from ( select node_name, row_number() over ( order by node_name )-1 node_id from nodes ) n1, ( select node_name, row_number() over ( order by node_name  desc )-1 node_id from nodes ) n2 where n1.node_id = n2.node_id ) v order by node_id;
                                                                                                 ?column?                                                                                              
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     create local temp table tx_0 on commit preserve rows as select * from t2 where node_id = 0 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0001,v_udb_node0024;
     create local temp table tx_1 on commit preserve rows as select * from t2 where node_id = 1 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0002,v_udb_node0023;
     create local temp table tx_2 on commit preserve rows as select * from t2 where node_id = 2 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0003,v_udb_node0022;
     create local temp table tx_3 on commit preserve rows as select * from t2 where node_id = 3 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0004,v_udb_node0021;
     create local temp table tx_4 on commit preserve rows as select * from t2 where node_id = 4 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0005,v_udb_node0020;
     create local temp table tx_5 on commit preserve rows as select * from t2 where node_id = 5 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0006,v_udb_node0019;
     create local temp table tx_6 on commit preserve rows as select * from t2 where node_id = 6 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0007,v_udb_node0018;
     create local temp table tx_7 on commit preserve rows as select * from t2 where node_id = 7 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0008,v_udb_node0017;
     create local temp table tx_8 on commit preserve rows as select * from t2 where node_id = 8 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0009,v_udb_node0016;
     create local temp table tx_9 on commit preserve rows as select * from t2 where node_id = 9 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0010,v_udb_node0015;
     create local temp table tx_10 on commit preserve rows as select * from t2 where node_id = 10 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0011,v_udb_node0014;
     create local temp table tx_11 on commit preserve rows as select * from t2 where node_id = 11 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0012,v_udb_node0013;
     create local temp table tx_12 on commit preserve rows as select * from t2 where node_id = 12 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0013,v_udb_node0012;
     create local temp table tx_13 on commit preserve rows as select * from t2 where node_id = 13 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0014,v_udb_node0011;
     create local temp table tx_14 on commit preserve rows as select * from t2 where node_id = 14 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0015,v_udb_node0010;
     create local temp table tx_15 on commit preserve rows as select * from t2 where node_id = 15 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0016,v_udb_node0009;
     create local temp table tx_16 on commit preserve rows as select * from t2 where node_id = 16 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0017,v_udb_node0008;
     create local temp table tx_17 on commit preserve rows as select * from t2 where node_id = 17 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0018,v_udb_node0007;
     create local temp table tx_18 on commit preserve rows as select * from t2 where node_id = 18 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0019,v_udb_node0006;
     create local temp table tx_19 on commit preserve rows as select * from t2 where node_id = 19 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0020,v_udb_node0005;
     create local temp table tx_20 on commit preserve rows as select * from t2 where node_id = 20 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0021,v_udb_node0004;
     create local temp table tx_21 on commit preserve rows as select * from t2 where node_id = 21 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0022,v_udb_node0003;
     create local temp table tx_22 on commit preserve rows as select * from t2 where node_id = 22 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0023,v_udb_node0002;
     create local temp table tx_23 on commit preserve rows as select * from t2 where node_id = 23 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0024,v_udb_node0001;
    (24 rows)

     

    dbadmin=> create table ul_result
    dbadmin-> as
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by matrix )
    dbadmin-> from tx_0
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_1
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_2
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_3
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_4
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_5
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_6
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_7
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_8
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_9
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_10
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_11
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_12
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_13
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_14
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_15
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_16
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_17
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_18
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_19
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_20
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_21
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_22
    dbadmin-> union all
    dbadmin-> select  
    dbadmin->   ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
    dbadmin-> from tx_23
    dbadmin-> order by zip_prefix, matrix_id segmented by hash(zip_prefix, d1, d2, d3) all nodes;

     

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file