Using Python Multiprocessing in a UDx function
I have been attempting to use all cores available on a node when calling a UDx function. We have some need to run intensive machine learning computation at scale, but don't want to use unfenced C++ mode yet. The usual way to do parallel processing using partitions given here: https://forum.vertica.com/discussion/241038/parallel-processing-using-partitions-with-vertica-udx does not exercise all cores on the node. Maybe because is it running in fenced mode and is limited to one thread.
An alternate way would be to spawn process within the UDx and collect all the results in the Python code and emit the output. So we want to run a Multiprocess Python program within a Transform function. Here is the code for a simple use case.
This is inspired from the Python example given here (Look at the very bottom of the page to see the example):
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
This works fine when running on the node in standalone mode.
Now we will try to use the same principles within a UDx function.
The premise is that we a table as follows:
CREATE TABLE IF NOT EXISTS dev.basic_queue_records ( a int, b int, operand varchar(10) );
We would just want to write a UDX that takes the two integers and the operand, and produces an additional column (result) based on the operand. If the operand is 'plus' it adds the two integers. If the operand is 'mul' it multiples the two numbers. The UDX will emit four fields viz. (a, b, operand, result). This is the same problem given in the Python example above but done on an SQL table.
Here is the UDX code in Python:
import vertica_sdk import time import random from multiprocessing import Process, Queue, freeze_support def worker(input, output): for func, args in iter(input.get, 'STOP'): result = calculate(func, args) output.put(result) def calculate(func, args): result = func(*args) return args[0], args[1], func.__name__, result def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b class BasicQueueUDX(vertica_sdk.TransformFunction): def processPartition(self, server_interface, input, output): """ :param server_interface: :param input: :param output: :return: """ freeze_support() NUMBER_OF_PROCESSES = 4 TASKS = [] while True: a = input.getInt(0) b = input.getInt(1) operand = input.getString(2) TASKS.append((operand, (a, b))) if not input.next(): break # Create queues task_queue = Queue() done_queue = Queue() # Submit tasks for task in TASKS: task_queue.put(task) # Start worker processes for i in range(NUMBER_OF_PROCESSES): Process(target=worker, args=(task_queue, done_queue)).start() # Get and print results server_interface.log(f"About to process {len(TASKS)} tasks using {NUMBER_OF_PROCESSES} processes") for i in range(len(TASKS)): out_row = done_queue.get() server_interface.log(f"{int(out_row[0])}, {int(out_row[1])}, {out_row[2]}, {int(out_row[3])}") output.setInt(0, int(out_row[0])) # a output.setInt(1, int(out_row[1])) # b output.setString(2, out_row[2]) # operand output.setInt(3, int(out_row[3])) # result output.next() # Tell child processes to stop for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') server_interface.log(f"All records processed. Exiting.") class BasicQueueUDXFactory(vertica_sdk.TransformFunctionFactory): def getPrototype(self, server_interface, arg_types, return_type): arg_types.addInt() arg_types.addInt() arg_types.addVarchar() return_type.addInt() return_type.addInt() return_type.addVarchar() return_type.addInt() def getReturnType(self, server_interface, arg_types, return_type): return_type.addColumn(arg_types.getColumnType(0), "a") return_type.addColumn(arg_types.getColumnType(1), "b") return_type.addColumn(arg_types.getColumnType(2), "operand") return_type.addColumn(arg_types.getColumnType(1), "result") def createTransformFunction(cls, server_interface): return BasicQueueUDX()
Assume that the file exists in path '/home/dbadmin/udx/BasicQueueUDX.py'.
To install the library, we do the following:
DROP LIBRARY IF EXISTS pymultiprocess CASCADE; CREATE LIBRARY pymultiprocess AS '/home/dbadmin/udx/BasicQueueUDX.py' DEPENDS '/usr/local/lib/python3.7/site-packages/*:/usr/local/lib64/python3.7/site-packages/*' LANGUAGE 'Python'; CREATE TRANSFORM FUNCTION BASIC_QUEUE AS NAME 'BasicQueueUDXFactory' LIBRARY pymultiprocess;
Populate the table with some rows:
INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (7, 3, 'plus'); INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (7, 4, 'mul'); INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (9, 7, 'plus'); INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (10, 3, 'mul');
Then call the UDx function.
SELECT BASIC_QUEUE(a, b, operand) OVER () FROM dev.basic_queue_records;
This essentially locks with the following output on the server logs. It runs for ever. When the process is cancelled, it calls the Python destructor with no output.
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 UDx side process started 2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 My port: 59201 2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 My address: 127.0.0.1 2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 Vertica port: 45475 2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 Vertica address: 127.0.0.1 2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744] 0x7fa400b74740 Vertica address family: 2 2021-09-20 20:34:43.829 [Python-v_testdb2_node0001-5537:0x13ed80-64787] 0x7fa400b74740 PythonInterface destructor called 2021-09-20 20:34:43.983 [Python-v_testdb2_node0001-5537:0x13ed80-64788] 0x7fa400b74740 [UserMessage] BASIC_QUEUE - About to process 4 tasks using 4 processes
Anyone has any clues why this in locking within a Vertica context, but seems to run fine when called as an application?
Comments
I think you can see the exceptions in UDxFencedProcesses.log or vertica.log.
The point we need to focus on is as below:
With your sample data, Python calls the following lines:
The first argument is the str data type. I guess you tried to call the following lines without Vertica SDK.
The first argument is the reference for the functions. Can you check it?
Python code is hanging on line 63:
dbLog has the following:
I haven't looked carefully why. But you definitely don't need to parallelize the computation by yourself for this case (every record can be processed independently). Try using partition best. That will parallelize function evaluation among cluster nodes and also implement intra-node parallelization.
SELECT BASIC_QUEUE(a, b, operand) OVER (PARTITION BEST) FROM basic_queue_records;
Thanks for your feedback. The changes that I did to make it work are as follows:
This is just to avoid the Python error. Otherwise this works both with:
as well as
Hopefully this example can help other people who wish use Multiprocessing in their UDx functions.
The real use-case I am going for is more complex with some machine learning algorithms that are not yet available in Vertica. With the normal fenced use of a Python UDx, all cores of the machine are not exercised. This basically defeats the point of having a beefy machine with multiple cores since these cores are mostly idle when only one thread is active per node.
And an additional note. Attempting to use the same approach for more complex problems involving use of Pandas within each process, gives the following error:
Makes me think that Vertica SDK is not compatible with the Multiprocessing module in Python 3.7.
@Ariel_Cary Can you explain more details about "intra-node parallelization"? Does it mean using the multiple Python processes, or the multiple Python threads, or the multiple Vertica nodes?
@anupam_at_arrivalist Can you provide me the simple sample code with Pandas, and the sample data? I would like to try it.
@anupam_at_arrivalist: avoid using empty over clause.
SELECT BASIC_QUEUE(a, b, operand) OVER () FROM dev.basic_queue_records;
That will force the server to process your entire input as a single partition in a single thread. Use
OVER(PARTITION BEST)
instead.Furthermore, is it necessary to implement your logic with a UD transform function? Partitions don't really matter in your use case as far as I can see. If that's true, a UD scalar function is enough. The server will push down the computation to all nodes as needed.
@Hibiki For Python UDx functions, it means multiple side processes.
Use this as your input data. Import it into a table (say),
dbscan_clustering_input
.Install the library the usual way, for example:
Then call the library as follows:
Hope you can now reproduce the error at your end.
I have to add that using Multi-process while handling all the segmentation within the Python code is going to result in a huge amount of memory utilization. Thus an approach like this can only be done on a machine with lots of memory and multiple cores.
@Ariel_Cary Does "multiple side processes" mean the multiple Python processes per node, or the single Python process per node but involving all nodes?
@anupam_at_arrivalist Thank you for sharing the sample code and the test data. Please let me try it.
@Hibiki As per my knowledge, "multiple side processes" would mean multiple Python processes on the same node. All Vertica nodes will work on the respective data present on the local node (at least in Enterprise mode). Within the node, Python multiprocessing will spin off multiple processes (not GIL threads) to process each segment. How the segmentation is done, is handled within the Python function.
@anupam_at_arrivalist Thank you for your explanation. I ran the following UDx to see how many processes were run. As the result, I saw one process per node was run.
@anupam_at_arrivalist I could reproduce the same issue in my environment. I modified your script to the simple Python script to run it by the Python runtime shipped with Vertica. It worked well. So, I think the Python runtime shipped with Vertica doesn't have any issue.
This error seems to be related to pickle. In addition, Vertica calls the Python UDx from C/C++. Probably, the behavior of pickle may be different. Let me check more details.
@anupam_at_arrivalist It seems the error happens if the child process returns the queue that stores the DataFrame values. Can you try to convert these values to the standard lists and append them to the queue?
In addition, I found that server_interface.log didn't work if it was stored in the queue. If you see the same behavior, please pass it to do_density_based_clustering function as an argument.
Thanks @Hibiki for your suggestions. I have tried to incorporate those, but somehow I still keep getting the same error.
The library is installed by pointing it to the Python installation that came with Vertica.
Note that the library dependencies are now pointing to the Vertica Python folder. So there should be no mis-match of libraries.
The function has been changed as follows:
But the error I am seeing is still:
Taking an alternate approach, I was able to get a little further. Using this approach, the query works for small input rows. But with large rows, we get a memory error. If we feed in about 1000 rows, there is no memory error but the output is partial. If someone can make this work for complete output, it will be quite acceptable. Here is the alternate solution:
This is the input file for the above problem, for testing purposes. Somehow I could not attach it to the previous comment.
@anupam_at_arrivalist Please check the attached script.
@anupam_at_arrivalist I loaded the data from sample_input.csv and ran my UDx. It worked well without the memory error.
@Hibiki The attached file does not compile since there is no variable called
queue
in that section of the code. There are two instances where this is the case.Maybe the wrong file got attached. Can you send me the exact file that ran successfully.
If I instead do (not sure if it is correct):
and provide some 100 records with only 5 unique device_ids, I get a memory error. So this method is consuming too much memory to be of any practical use.
@anupam_at_arrivalist I forgot to add "import queue". I attached the file again. I have run this script in 3-nodes cluster. I confirmed 3 vertica-udx-Python processes were launched on each node and the total RSS size per node was 330MB. It was a small allocation.