Controlling data segmentation
I have a transform UDF that I want to run per zip_prefix (first 2 digits of zipcode). The problem is that the data distribution per this zip prefix is quite skewed. So I'd like to evenly distribute subsets of zip_prefixes to each node and run the UDF on each node (e.g., partition nodes order by zip_prefix). In this case, my UDF would handle splitting up the data by zip_prefix as it streams in to the UDF.
The problem is I do not see how I can control the segmentation in such a manner.
For instance, you can see if I just segment by hash(zip_prefix), the amount of data processed per node will be uneven:
create local temp table t on commit preserve rows as
select zip_prefix, 1 matrix, site_idx, max(site_idx) over (partition by zip_prefix) site_len, aid_idx, max(aid_idx) over ( partition by zip_prefix) aid_len, weight
from ci_dm.ul_site_edge
where batch_id = 1
union all
select zip_prefix, 2 matrix, aid1_idx, 0, aid2_idx, 0, weight
from ci_dm.ul_aid_edge
where batch_id = 1
order by zip_prefix, matrix
segmented by hash(zip_prefix) all nodes
;
dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't_b0' group by 1 order by 1;
node_name | row_distribution_perc
----------------+-----------------------
v_udb_node0001 | 1.43
v_udb_node0002 | 3.62
v_udb_node0003 | 4.16
v_udb_node0004 | 8.24
v_udb_node0005 | 9.61
v_udb_node0006 | 0.24
v_udb_node0007 | 7.90
v_udb_node0008 | 0.68
v_udb_node0009 | 1.78
v_udb_node0010 | 5.22
v_udb_node0011 | 4.30
v_udb_node0012 | 6.40
v_udb_node0013 | 0.50
v_udb_node0014 | 5.23
v_udb_node0015 | 5.21
v_udb_node0016 | 2.31
v_udb_node0017 | 5.52
v_udb_node0018 | 2.19
v_udb_node0019 | 5.83
v_udb_node0020 | 4.66
v_udb_node0021 | 3.34
v_udb_node0022 | 5.10
v_udb_node0023 | 2.65
v_udb_node0024 | 3.87
On a 24-node cluster, I can figure out a decent way to create subsets of zip_prefixes and assign them to nodes:
dbadmin=> select node_id, count(distinct zip_prefix) zips, (sum(cnt)/sum(sum(cnt)) over()*100)::numeric(10,2) row_distribution_perc from ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) v group by 1order by 1;
node_id | zips | row_distribution_perc
---------+------+-----------------------
0 | 16 | 5.99
1 | 16 | 5.38
2 | 16 | 5.26
3 | 16 | 5.11
4 | 16 | 5.04
5 | 15 | 4.93
6 | 15 | 4.81
7 | 15 | 4.74
8 | 15 | 4.61
9 | 15 | 4.15
10 | 15 | 4.09
11 | 15 | 3.94
12 | 15 | 3.89
13 | 15 | 3.80
14 | 15 | 3.69
15 | 15 | 3.67
16 | 15 | 3.62
17 | 15 | 3.56
18 | 15 | 3.51
19 | 15 | 3.44
20 | 15 | 3.37
21 | 15 | 3.24
22 | 15 | 3.13
23 | 15 | 3.03
However, I cannot get the data to distribute in the cluster based on node_id:
dbadmin=> create local temp table t2 on commit preserve rows as
dbadmin-> select t.*, n.node_id
dbadmin-> from t, ( select zip_prefix, count(*) cnt, (row_number() over ( order by count(*) desc )-1) % 24 node_id from t group by 1 ) n
dbadmin-> where n.zip_prefix = t.zip_prefix
dbadmin-> order by node_id, zip_prefix, matrix
dbadmin-> segmented by node_id all nodes
dbadmin-> ;
WARNING 5993: Projection is irregularly segmented by column
HINT: Consider using a segmentation expression, such as SEGMENTED BY HASH(column)
CREATE TABLE
dbadmin=>
dbadmin=>
dbadmin=> select node_name, (sum(row_count)/sum(sum(row_count)) over()*100)::numeric(10,2) row_distribution_perc from projection_storage where projection_name = 't2_b0' group by 1 order by 1;
node_name | row_distribution_perc
----------------+-----------------------
v_udb_node0001 | 100.00
v_udb_node0002 | 0.00
v_udb_node0003 | 0.00
v_udb_node0004 | 0.00
v_udb_node0005 | 0.00
v_udb_node0006 | 0.00
v_udb_node0007 | 0.00
v_udb_node0008 | 0.00
v_udb_node0009 | 0.00
v_udb_node0010 | 0.00
v_udb_node0011 | 0.00
v_udb_node0012 | 0.00
v_udb_node0013 | 0.00
v_udb_node0014 | 0.00
v_udb_node0015 | 0.00
v_udb_node0016 | 0.00
v_udb_node0017 | 0.00
v_udb_node0018 | 0.00
v_udb_node0019 | 0.00
v_udb_node0020 | 0.00
v_udb_node0021 | 0.00
v_udb_node0022 | 0.00
v_udb_node0023 | 0.00
v_udb_node0024 | 0.00
Is there a segmentation clause of some other way that I can distribute the data this way?
Comments
My current solution is to actually break up the data into multiple tables and create each table on a specific node. Then run a union all query on top of these tables. Seems to be working...
dbadmin=> select 'create local temp table tx_'||node_id||' on commit preserve rows as select * from t2 where node_id = '||node_id||' order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes '||prime||','||buddy||';' from ( select n1.node_id, n1.node_name prime, n2.node_name buddy from ( select node_name, row_number() over ( order by node_name )-1 node_id from nodes ) n1, ( select node_name, row_number() over ( order by node_name desc )-1 node_id from nodes ) n2 where n1.node_id = n2.node_id ) v order by node_id;
?column?
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
create local temp table tx_0 on commit preserve rows as select * from t2 where node_id = 0 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0001,v_udb_node0024;
create local temp table tx_1 on commit preserve rows as select * from t2 where node_id = 1 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0002,v_udb_node0023;
create local temp table tx_2 on commit preserve rows as select * from t2 where node_id = 2 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0003,v_udb_node0022;
create local temp table tx_3 on commit preserve rows as select * from t2 where node_id = 3 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0004,v_udb_node0021;
create local temp table tx_4 on commit preserve rows as select * from t2 where node_id = 4 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0005,v_udb_node0020;
create local temp table tx_5 on commit preserve rows as select * from t2 where node_id = 5 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0006,v_udb_node0019;
create local temp table tx_6 on commit preserve rows as select * from t2 where node_id = 6 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0007,v_udb_node0018;
create local temp table tx_7 on commit preserve rows as select * from t2 where node_id = 7 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0008,v_udb_node0017;
create local temp table tx_8 on commit preserve rows as select * from t2 where node_id = 8 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0009,v_udb_node0016;
create local temp table tx_9 on commit preserve rows as select * from t2 where node_id = 9 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0010,v_udb_node0015;
create local temp table tx_10 on commit preserve rows as select * from t2 where node_id = 10 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0011,v_udb_node0014;
create local temp table tx_11 on commit preserve rows as select * from t2 where node_id = 11 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0012,v_udb_node0013;
create local temp table tx_12 on commit preserve rows as select * from t2 where node_id = 12 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0013,v_udb_node0012;
create local temp table tx_13 on commit preserve rows as select * from t2 where node_id = 13 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0014,v_udb_node0011;
create local temp table tx_14 on commit preserve rows as select * from t2 where node_id = 14 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0015,v_udb_node0010;
create local temp table tx_15 on commit preserve rows as select * from t2 where node_id = 15 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0016,v_udb_node0009;
create local temp table tx_16 on commit preserve rows as select * from t2 where node_id = 16 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0017,v_udb_node0008;
create local temp table tx_17 on commit preserve rows as select * from t2 where node_id = 17 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0018,v_udb_node0007;
create local temp table tx_18 on commit preserve rows as select * from t2 where node_id = 18 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0019,v_udb_node0006;
create local temp table tx_19 on commit preserve rows as select * from t2 where node_id = 19 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0020,v_udb_node0005;
create local temp table tx_20 on commit preserve rows as select * from t2 where node_id = 20 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0021,v_udb_node0004;
create local temp table tx_21 on commit preserve rows as select * from t2 where node_id = 21 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0022,v_udb_node0003;
create local temp table tx_22 on commit preserve rows as select * from t2 where node_id = 22 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0023,v_udb_node0002;
create local temp table tx_23 on commit preserve rows as select * from t2 where node_id = 23 order by node_id, zip_prefix, matrix segmented by hash(node_id) nodes v_udb_node0024,v_udb_node0001;
(24 rows)
dbadmin=> create table ul_result
dbadmin-> as
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by matrix )
dbadmin-> from tx_0
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_1
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_2
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_3
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_4
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_5
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_6
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_7
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_8
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_9
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_10
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_11
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_12
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_13
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_14
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_15
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_16
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_17
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_18
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_19
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_20
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_21
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_22
dbadmin-> union all
dbadmin-> select
dbadmin-> ci_dm.ulalgo2(matrix, site_len, aid_len, site_idx-1, aid_idx-1, weight, zip_prefix using parameters lamba=10.0, iterations=500, threads=16, debug=false) over ( partition by node_id, zip_prefix order by zip_prefix, matrix )
dbadmin-> from tx_23
dbadmin-> order by zip_prefix, matrix_id segmented by hash(zip_prefix, d1, d2, d3) all nodes;