Parallel processing using partitions with Vertica UDx
You can add functionality to Vertica using UDx, but what if you need to process more data than can be efficiently processed in a single thread or a single node?
Vertica can divide data into partitions defined with the OVER() clause and distribute computing across nodes. This partition processing is "shared-nothing" similar to the Map stage in the Hadoop Map-Reduce paradigm. However, you can call the UDx from SQL and continue to process the results using SQL, even passing to other UDx.
Here I'll show an example using a Python UDx to calculate count and average of sales figures across a simple data set of year,region,txn,amount tuples and show how Vertica handles different partition clauses.
The transform function prototype is pyPartitions(txn, amount) OVER(x) where txn and amount are the transaction ID and sale amount columns and "x" specifies the partition condition.
The Python code and sample SQL are attached, and I'll show the output here.
SQL output:
SELECT pyPartitions(txn, amount) OVER() FROM txns;
count | average
-------+------------------
27 | 27.1533333333333SELECT year, pyPartitions(txn, amount) OVER(PARTITION BY year) FROM txns;
year | count | average
------+-------+---------
2010 | 9 | 23.45
2011 | 9 | 23.45
2012 | 9 | 34.56SELECT year, region, pyPartitions(txn, amount) OVER(PARTITION BY year, region) FROM txns;
year | region | count | average
------+--------+-------+---------
2011 | CT | 3 | 23.45
2012 | CT | 3 | 34.56
2012 | NJ | 3 | 34.56
2012 | NY | 3 | 34.56
2010 | CT | 3 | 23.45
2011 | NY | 3 | 23.45
2010 | NJ | 3 | 23.45
2010 | NY | 3 | 23.45
2011 | NJ | 3 | 23.45
This is the output we expect for each partition clause. How does Vertica distribute the work? I ran these queries on a three-node cluster and put a debug message in the function, so let's see what was in the UDx logs. We see below that with no partition, all the work is done in one operation on the initiator node; partition by year results in three operations on the initiator node; but for partition by year and region, the 9 resulting partitions are distributed into 2 operations, 3 operations, and 4 operations across the three nodes.
Here are the abbreviated logs showing more detail:
2019-10-13 17:45:12.583 [Python-v_verticatf_node0001-29313:0x17e-35305] 0x7fcf02e0b780 UDx side process started
17:45:14.506 [Python-v_verticatf_node0001-29313:0x17e-35564] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:27,27.153333333333332
17:45:14.508 [Python-v_verticatf_node0001-29313:0x17e-35572] 0x7fcf02e0b780 PythonExecContext::getUDxExecContext
17:45:14.625 [Python-v_verticatf_node0001-29313:0x17e-35573] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:9,23.450000000000003
17:45:14.625 [Python-v_verticatf_node0001-29313:0x17e-35573] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:9,23.450000000000003
17:45:14.626 [Python-v_verticatf_node0001-29313:0x17e-35573] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:9,34.56
17:45:14.630 [Python-v_verticatf_node0001-29313:0x17e-35587] 0x7fcf02e0b780 PythonExecContext::getUDxExecContext
17:45:14.704 [Python-v_verticatf_node0001-29313:0x17e-35588] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:3,23.45
17:45:14.705 [Python-v_verticatf_node0001-29313:0x17e-35588] 0x7fcf02e0b780 [UserMessage] pyPartitions - pyPartition returning:3,23.452019-10-13 17:45:12.752 [Python-v_verticatf_node0001-29313:0x17e-24912] 0x7ff891005780 UDx side process started
17:45:14.629 [Python-v_verticatf_node0001-29313:0x17e-25074] 0x7ff891005780 PythonExecContext::getUDxExecContext
17:45:14.699 [Python-v_verticatf_node0001-29313:0x17e-25075] 0x7ff891005780 [UserMessage] pyPartitions - pyPartition returning:3,23.45
17:45:14.700 [Python-v_verticatf_node0001-29313:0x17e-25075] 0x7ff891005780 [UserMessage] pyPartitions - pyPartition returning:3,34.56
17:45:14.700 [Python-v_verticatf_node0001-29313:0x17e-25075] 0x7ff891005780 [UserMessage] pyPartitions - pyPartition returning:3,34.56
17:45:14.701 [Python-v_verticatf_node0001-29313:0x17e-25075] 0x7ff891005780 [UserMessage] pyPartitions - pyPartition returning:3,34.562019-10-13 17:45:12.746 [Python-v_verticatf_node0001-29313:0x17e-10306] 0x7f2f7c4b8780 UDx side process started
17:45:14.625 [Python-v_verticatf_node0001-29313:0x17e-10468] 0x7f2f7c4b8780 PythonExecContext::getUDxExecContext
17:45:14.704 [Python-v_verticatf_node0001-29313:0x17e-10469] 0x7f2f7c4b8780 [UserMessage] pyPartitions - pyPartition returning:3,23.45
17:45:14.704 [Python-v_verticatf_node0001-29313:0x17e-10469] 0x7f2f7c4b8780 [UserMessage] pyPartitions - pyPartition returning:3,23.45
17:45:14.704 [Python-v_verticatf_node0001-29313:0x17e-10469] 0x7f2f7c4b8780 [UserMessage] pyPartitions - pyPartition returning:3,23.45
Have fun dividing and conquering your data!