Using Python Multiprocessing in a UDx function

I have been attempting to use all cores available on a node when calling a UDx function. We have some need to run intensive machine learning computation at scale, but don't want to use unfenced C++ mode yet. The usual way to do parallel processing using partitions given here: https://forum.vertica.com/discussion/241038/parallel-processing-using-partitions-with-vertica-udx does not exercise all cores on the node. Maybe because is it running in fenced mode and is limited to one thread.

An alternate way would be to spawn process within the UDx and collect all the results in the Python code and emit the output. So we want to run a Multiprocess Python program within a Transform function. Here is the code for a simple use case.

This is inspired from the Python example given here (Look at the very bottom of the page to see the example):
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
This works fine when running on the node in standalone mode.

Now we will try to use the same principles within a UDx function.

The premise is that we a table as follows:

CREATE TABLE IF NOT EXISTS dev.basic_queue_records 
(
    a int,
    b int,
    operand varchar(10)
);

We would just want to write a UDX that takes the two integers and the operand, and produces an additional column (result) based on the operand. If the operand is 'plus' it adds the two integers. If the operand is 'mul' it multiples the two numbers. The UDX will emit four fields viz. (a, b, operand, result). This is the same problem given in the Python example above but done on an SQL table.

Here is the UDX code in Python:

import vertica_sdk

import time
import random
from multiprocessing import Process, Queue, freeze_support

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return args[0], args[1], func.__name__, result

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

class BasicQueueUDX(vertica_sdk.TransformFunction):

    def processPartition(self, server_interface, input, output):
        """
        :param server_interface:
        :param input:
        :param output:
        :return:
        """
        freeze_support()

        NUMBER_OF_PROCESSES = 4

        TASKS = []
        while True:
            a = input.getInt(0)
            b = input.getInt(1)
            operand = input.getString(2)
            TASKS.append((operand, (a, b)))

            if not input.next():
                break

        # Create queues
        task_queue = Queue()
        done_queue = Queue()

        # Submit tasks
        for task in TASKS:
            task_queue.put(task)

        # Start worker processes
        for i in range(NUMBER_OF_PROCESSES):
            Process(target=worker, args=(task_queue, done_queue)).start()

        # Get and print results
        server_interface.log(f"About to process {len(TASKS)} tasks using {NUMBER_OF_PROCESSES} processes")

        for i in range(len(TASKS)):
            out_row = done_queue.get()
            server_interface.log(f"{int(out_row[0])}, {int(out_row[1])}, {out_row[2]}, {int(out_row[3])}")
            output.setInt(0, int(out_row[0]))  # a
            output.setInt(1, int(out_row[1]))  # b
            output.setString(2, out_row[2])    # operand
            output.setInt(3, int(out_row[3]))  # result
            output.next()

        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')

        server_interface.log(f"All records processed. Exiting.")

class BasicQueueUDXFactory(vertica_sdk.TransformFunctionFactory):
    def getPrototype(self, server_interface, arg_types, return_type):
        arg_types.addInt()
        arg_types.addInt()
        arg_types.addVarchar()
        return_type.addInt()
        return_type.addInt()
        return_type.addVarchar()
        return_type.addInt()


    def getReturnType(self, server_interface, arg_types, return_type):
        return_type.addColumn(arg_types.getColumnType(0), "a")
        return_type.addColumn(arg_types.getColumnType(1), "b")
        return_type.addColumn(arg_types.getColumnType(2), "operand")
        return_type.addColumn(arg_types.getColumnType(1), "result")


    def createTransformFunction(cls, server_interface):
        return BasicQueueUDX()

Assume that the file exists in path '/home/dbadmin/udx/BasicQueueUDX.py'.

To install the library, we do the following:

DROP LIBRARY IF EXISTS pymultiprocess CASCADE;
CREATE LIBRARY pymultiprocess AS '/home/dbadmin/udx/BasicQueueUDX.py' DEPENDS '/usr/local/lib/python3.7/site-packages/*:/usr/local/lib64/python3.7/site-packages/*' LANGUAGE 'Python';
CREATE TRANSFORM FUNCTION BASIC_QUEUE AS NAME 'BasicQueueUDXFactory' LIBRARY pymultiprocess;

Populate the table with some rows:

INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (7, 3, 'plus');
INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (7, 4, 'mul');
INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (9, 7, 'plus');
INSERT INTO dev.basic_queue_records (a, b, operand) VALUES (10, 3, 'mul');

Then call the UDx function.

SELECT BASIC_QUEUE(a, b, operand) OVER () FROM dev.basic_queue_records;

This essentially locks with the following output on the server logs. It runs for ever. When the process is cancelled, it calls the Python destructor with no output.

2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 UDx side process started
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 My port: 59201
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 My address: 127.0.0.1
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 Vertica port: 45475
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 Vertica address: 127.0.0.1
2021-09-20 20:34:31.751 [Python-v_testdb2_node0001-5537:0x13ed80-64744]  0x7fa400b74740 Vertica address family: 2
2021-09-20 20:34:43.829 [Python-v_testdb2_node0001-5537:0x13ed80-64787]  0x7fa400b74740 PythonInterface destructor called
2021-09-20 20:34:43.983 [Python-v_testdb2_node0001-5537:0x13ed80-64788]  0x7fa400b74740 [UserMessage] BASIC_QUEUE - About to process 4 tasks using 4 processes

Anyone has any clues why this in locking within a Vertica context, but seems to run fine when called as an application?

Tagged:

Comments

  • HibikiHibiki Vertica Employee Employee

    I think you can see the exceptions in UDxFencedProcesses.log or vertica.log.
    The point we need to focus on is as below:

    TASKS.append((operand, (a, b)))
    

    With your sample data, Python calls the following lines:

    TASKS.append(('plus', (7, 3)))
    TASKS.append(('mul', (7, 4)))
    TASKS.append(('plus', (9, 7)))
    TASKS.append(('mul', (10, 3)))
    

    The first argument is the str data type. I guess you tried to call the following lines without Vertica SDK.

    TASKS.append((plus, (7, 3)))
    TASKS.append((mul, (7, 4)))
    TASKS.append((plus, (9, 7)))
    TASKS.append((mul, (10, 3)))
    

    The first argument is the reference for the functions. Can you check it?

  • Ariel_CaryAriel_Cary Vertica Employee Employee

    Python code is hanging on line 63:

        62          for i in range(len(TASKS)):
        63              out_row = done_queue.get()
        64              server_interface.log(f"{int(out_row[0])}, {int(out_row[1])}, {out_row[2]}, {int(out_row[3])}")
        65              output.setInt(0, int(out_row[0]))  # a
        66              output.setInt(1, int(out_row[1]))  # b
        67              output.setString(2, out_row[2])    # operand
        68              output.setInt(3, int(out_row[3]))  # result
        69              output.next()
    

    dbLog has the following:

    Traceback (most recent call last):
      File "/opt/vertica/oss/python3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
      File "/opt/vertica/oss/python3/lib/python3.9/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "/data/v_test_node0001_catalog/Libraries/029dbd8249937f4ee9d59d512687ebb000a0000000000992/pymultiprocess_029dbd8249937f4ee9d59d512687ebb000a0000000000992.py", line 9, in worker
        result = calculate(func, args)
      File "/data/v_test_node0001_catalog/Libraries/029dbd8249937f4ee9d59d512687ebb000a0000000000992/pymultiprocess_029dbd8249937f4ee9d59d512687ebb000a0000000000992.py", line 13, in calculate
        result = func(*args)
    TypeError: 'str' object is not callable
    
    

    I haven't looked carefully why. But you definitely don't need to parallelize the computation by yourself for this case (every record can be processed independently). Try using partition best. That will parallelize function evaluation among cluster nodes and also implement intra-node parallelization.
    SELECT BASIC_QUEUE(a, b, operand) OVER (PARTITION BEST) FROM basic_queue_records;

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    Thanks for your feedback. The changes that I did to make it work are as follows:

    def calculate(func, args):
        if func == 'plus':
            result = plus(*args)
        elif func == 'mul':
            result = mul(*args)
        else:
            result = -1000000000
        return args[0], args[1], func, result
    

    This is just to avoid the Python error. Otherwise this works both with:

    SELECT BASIC_QUEUE(a, b, operand) OVER () FROM dev.basic_queue_records;
    

    as well as

    SELECT BASIC_QUEUE(a, b, operand) OVER (PARTITION BEST) FROM dev.basic_queue_records;
    

    Hopefully this example can help other people who wish use Multiprocessing in their UDx functions.

    The real use-case I am going for is more complex with some machine learning algorithms that are not yet available in Vertica. With the normal fenced use of a Python UDx, all cores of the machine are not exercised. This basically defeats the point of having a beefy machine with multiple cores since these cores are mostly idle when only one thread is active per node.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer
    edited September 2021

    And an additional note. Attempting to use the same approach for more complex problems involving use of Pandas within each process, gives the following error:

    TypeError: no default __reduce__ due to non-trivial __cinit__
    Traceback (most recent call last):
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
        obj = _ForkingPickler.dumps(obj)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
        cls(buf, protocol).dump(obj)
      File "stringsource", line 2, in vertica_sdk.ServerInterface.__reduce_cython__
    

    Makes me think that Vertica SDK is not compatible with the Multiprocessing module in Python 3.7.

  • HibikiHibiki Vertica Employee Employee

    @Ariel_Cary Can you explain more details about "intra-node parallelization"? Does it mean using the multiple Python processes, or the multiple Python threads, or the multiple Vertica nodes?

  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist Can you provide me the simple sample code with Pandas, and the sample data? I would like to try it.

  • Ariel_CaryAriel_Cary Vertica Employee Employee

    @anupam_at_arrivalist: avoid using empty over clause.
    SELECT BASIC_QUEUE(a, b, operand) OVER () FROM dev.basic_queue_records;
    That will force the server to process your entire input as a single partition in a single thread. Use OVER(PARTITION BEST)instead.

    Furthermore, is it necessary to implement your logic with a UD transform function? Partitions don't really matter in your use case as far as I can see. If that's true, a UD scalar function is enough. The server will push down the computation to all nodes as needed.

    @Hibiki For Python UDx functions, it means multiple side processes.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer
    class PartitionedDBScanMultiProcess(vertica_sdk.TransformFunction):
    
        def processPartition(self, server_interface, input, output):
            """
            We get all the records for all the devices within the node in one function call. First of all, we need
            to create a dictionary of {device_id: dataframe_of_records} for all the records received. Then we spawn
            one process each to do clustering for each device. (Note that this is contrast to just doing clustering for
            one device at-a-time delegating the task of partitioning to the Vertica SQL call). This allows us to use all
            CPU cores of the available machine, resulting is better utilization of resources.
            :param server_interface:
            :param input:
            :param output:
            :return:
            """
    
            freeze_support()
            server_interface.log(f"Number of CPUs in this node: {mp.cpu_count()}")
    
            device_records = {}
            while True:
                device_id = input.getInt(0)
                if device_id in device_records:  # append to the old list
                    device_records[device_id].append({
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)})
                else:  # create a new record
                    new_record = [{
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)}]  # a list gets created here
                    device_records[device_id] = new_record
    
                if not input.next():
                    break
    
            # We have all the data now. Let us break up the task into multiple processes
            server_interface.log(f"Creating tasks")
            TASKS = [(do_density_based_clustering, (server_interface, device_id, device_records[device_id])) for device_id in device_records]
            server_interface.log(f"{len(device_records)} tasks created")
            NUMBER_OF_PROCESSES = min(mp.cpu_count(), len(device_records))
    
            # Create queues
            task_queue = Queue()
            done_queue = Queue()
    
            # Submit tasks
            for task in TASKS:
                task_queue.put(task)
    
            # Start worker processes
            for i in range(NUMBER_OF_PROCESSES):
                Process(target=worker, args=(task_queue, done_queue)).start()
    
            server_interface.log(f"{NUMBER_OF_PROCESSES} processes created")
    
            for i in range(len(TASKS)):
                device_out_dataframe = done_queue.get()
    
                for index, row in device_out_dataframe.iterrows():  # Output a row for each record in the dataframe
                    val = row.values
                    server_interface.log(f"{int(val[0])}, {int(val[1])}, {int(val[1])}, {val[2]}, {val[3]}, {val[4]}")
                    output.setInt(0, int(val[0]))
                    output.setInt(1, int(val[1]))
                    output.setFloat(2, val[2])
                    output.setFloat(3, val[3])
                    output.setInt(4, int(val[4]))
                    output.next()
    
            # Tell child processes to stop
            for i in range(NUMBER_OF_PROCESSES):
                task_queue.put('STOP')
    
            server_interface.log(f"All records processed. Exiting.")
    
    class PartitionedDBScanMultiProcessFactory(vertica_sdk.TransformFunctionFactory):
        def getPrototype(self, server_interface, arg_types, return_type):
            arg_types.addInt()
            arg_types.addInt()
            arg_types.addFloat()
            arg_types.addFloat()
            return_type.addInt()
            return_type.addInt()
            return_type.addFloat()
            return_type.addFloat()
            return_type.addInt()
    
    
        def getReturnType(self, server_interface, arg_types, return_type):
            return_type.addColumn(arg_types.getColumnType(0), "device_id")
            return_type.addColumn(arg_types.getColumnType(1), "cluster_id")
            return_type.addColumn(arg_types.getColumnType(2), "lat")
            return_type.addColumn(arg_types.getColumnType(3), "lon")
            return_type.addColumn(arg_types.getColumnType(0), "personality_id")
    
    
        def createTransformFunction(cls, server_interface):
            return PartitionedDBScanMultiProcess()
    

    Use this as your input data. Import it into a table (say), dbscan_clustering_input.

    "device_id","cluster_id","lon","lat"
    84253,14988754132,121.487895692308,31.2491530769231
    84253,14988754131,-77.6405076666667,38.8366373333333
    483855,15282876472,-101.9336211,34.97997825
    483855,15282876474,-120.80833475,45.8132591
    483855,15282876471,-122.574880666667,45.6592173333333
    3532422,14988004822,-101.04994875,37.30542685
    3532422,14988004816,-100.934259108,37.059844156
    3532422,14988004812,-100.93438904,37.0598932
    3532422,14988004837,-101.139961825,37.121071275
    3532422,14988004829,-101.3480584,37.1773555
    3532422,14988004828,-101.3358365,37.1696825
    3532422,14988004825,-101.10566635,37.1132184642857
    3532422,14988004824,-101.140040353846,37.1205988461538
    3532422,14988004830,-101.337194,37.170309
    3532422,14988004832,-101.1400096875,37.1209370875
    3532422,14988004839,-100.934347425,37.05988185
    3532422,14988004813,-100.931991,37.0409385
    3532422,14988004814,-100.934446,37.059867
    3532422,14988004815,-100.935276769231,37.0245565384615
    3532422,14988004817,-100.9226435,37.1298765
    3665694,15246784540,-84.5982055,35.438819
    3665694,15246784525,-93.6041166666667,41.5983166666667
    3665694,15246784532,-93.605115,41.600715
    3665694,15246784533,-84.5981487142857,35.4388541428571
    3665694,15246784534,-93.613361,41.597605
    3665694,15246784536,-84.6474200434783,35.4742947826087
    3665694,15246784537,-84.600346,35.4355563333333
    3665694,15246784539,-84.6003455,35.43549
    4824816,15246793759,-73.8966816666667,41.9536908333333
    4824816,15246793780,-73.874959,41.7429485
    4824816,15246793778,-73.87495825,41.742948125
    4824816,15246793776,-73.8749586666667,41.742948
    4824816,15246793764,-78.5680482857143,35.8667108571429
    6241564,14989111658,-78.8465955,35.8519383333333
    6241564,14989111668,-95.1088760625,29.5644795625
    6241564,14989111661,-78.58711868,35.90019988
    6241564,14989111659,-78.5871671553785,35.9000712111554
    6241564,14989111656,-78.8466195,35.8519745
    6241564,14989111655,-78.587002,35.9002537333333
    6241564,14989111654,-78.8465625,35.8518675
    6241564,14989111653,-78.5873006666667,35.9003293333333
    6241564,14989111647,-78.587192853211,35.9000971284404
    6241564,14989111662,-82.530976,27.981994
    6241564,14989111671,-78.5872618333333,35.9001270138889
    6241564,14989111663,-78.5872692666667,35.8998651333333
    6241564,14989111664,-78.5281087437186,35.9612016281407
    6241564,14989111665,-78.5871022857143,35.9001022571429
    6241564,14989111666,-78.5280854333333,35.9611551666667
    6241564,14989111667,-78.587144974026,35.9001348376623
    
    

    Install the library the usual way, for example:

    DROP LIBRARY IF EXISTS pydbscan CASCADE;
    CREATE LIBRARY pydbscan AS '/home/dbadmin/udx/PartitionedDBScanMultiProcessWithTaskQueue.py' DEPENDS '/usr/local/lib/python3.7/site-packages/*:/usr/local/lib64/python3.7/site-packages/*' LANGUAGE 'Python';
    CREATE TRANSFORM FUNCTION PARTITIONED_DBSCAN AS NAME 'PartitionedDBScanMultiProcessFactory' LIBRARY pydbscan;
    

    Then call the library as follows:

    SELECT PARTITIONED_DBSCAN(device_id, cluster_id, lat, lon) OVER (PARTITION BEST) FROM dbscan_clustering_input;
    

    Hope you can now reproduce the error at your end.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    I have to add that using Multi-process while handling all the segmentation within the Python code is going to result in a huge amount of memory utilization. Thus an approach like this can only be done on a machine with lots of memory and multiple cores.

  • HibikiHibiki Vertica Employee Employee

    @Ariel_Cary Does "multiple side processes" mean the multiple Python processes per node, or the single Python process per node but involving all nodes?

  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist Thank you for sharing the sample code and the test data. Please let me try it.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    @Hibiki As per my knowledge, "multiple side processes" would mean multiple Python processes on the same node. All Vertica nodes will work on the respective data present on the local node (at least in Enterprise mode). Within the node, Python multiprocessing will spin off multiple processes (not GIL threads) to process each segment. How the segmentation is done, is handled within the Python function.

  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist Thank you for your explanation. I ran the following UDx to see how many processes were run. As the result, I saw one process per node was run.

    import vertica_sdk
    import os
    import threading as th
    
    class ProcessThread(vertica_sdk.TransformFunction):
        def processPartition(self, server_interface, input, output):
            value = input.getInt(0)
            output.setString(0, os.uname()[1])
            output.setInt(1, os.getpid())
            output.setInt(2, th.get_ident())
            output.next()
    
    class ProcessThreadFactory(vertica_sdk.TransformFunctionFactory):
        def getPrototype(self, server_interface, arg_types, return_type):
            arg_types.addInt()
        def getReturnType(self, server_interface, arg_types, return_type):
            return_type.addVarchar(30, "hostname")
            return_type.addInt("pid")
            return_type.addInt("ident")
        def createTransformFunction(cls, server_interface):
            return ProcessThread()
    
     hostname  |   pid   |      ident
    -----------+---------+-----------------
     vertica01 | 1243569 | 140554399401792
     vertica03 |   55863 | 140639416280896
     vertica02 |   55983 | 140340797105984
    
  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist I could reproduce the same issue in my environment. I modified your script to the simple Python script to run it by the Python runtime shipped with Vertica. It worked well. So, I think the Python runtime shipped with Vertica doesn't have any issue.

    This error seems to be related to pickle. In addition, Vertica calls the Python UDx from C/C++. Probably, the behavior of pickle may be different. Let me check more details.

  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist It seems the error happens if the child process returns the queue that stores the DataFrame values. Can you try to convert these values to the standard lists and append them to the queue?

    list_with_clusters = df_with_clusters.to_numpy().tolist()
    output.put(list_with_clusters)
    

    In addition, I found that server_interface.log didn't work if it was stored in the queue. If you see the same behavior, please pass it to do_density_based_clustering function as an argument.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    Thanks @Hibiki for your suggestions. I have tried to incorporate those, but somehow I still keep getting the same error.

    The library is installed by pointing it to the Python installation that came with Vertica.

    DROP LIBRARY IF EXISTS pydbscan CASCADE;
    CREATE LIBRARY pydbscan AS '/home/dbadmin/udx/PartitionedDBScanMultiProcessWithTaskQueue.py' DEPENDS '/opt/vertica/oss/python3/lib/python3.7/site-packages/*' LANGUAGE 'Python';
    CREATE TRANSFORM FUNCTION PARTITIONED_DBSCAN AS NAME 'PartitionedDBScanMultiProcessFactory' LIBRARY pydbscan;
    

    Note that the library dependencies are now pointing to the Vertica Python folder. So there should be no mis-match of libraries.

    The function has been changed as follows:

    import vertica_sdk
    import pandas as pd
    import numpy as np
    import multiprocessing as mp
    from sklearn.cluster import DBSCAN
    from multiprocessing import Process, Queue, freeze_support
    
    kms_per_radian = 6371.0088
    epsilon = 50 / kms_per_radian  # 50 Km separation for the clusters
    
    
    def worker(input, output):
        for func, args in iter(input.get, 'STOP'):
            result = calculate(func, args)
            output.put(result)
    
    
    def calculate(func, args):
        return func(*args)
    
    
    def do_density_based_clustering(server_interface, device_id, device_records=None):
        if device_records is None:
            device_records = []
    
        df = pd.DataFrame.from_records(device_records)
        coords = df[['lat', 'lon']].to_numpy()
        db = DBSCAN(eps=epsilon, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coords))
        cluster_labels = db.labels_
        df_with_clusters = df.join(pd.DataFrame(cluster_labels.T))
        list_with_clusters = df_with_clusters.to_numpy().tolist()
    
        server_interface.log(f"PartitionedDBScanMultiProcess got {len(device_records)} locations and found {len(set(cluster_labels))} personalities for device Id {device_id}")
    
        return list_with_clusters
    
    
    class PartitionedDBScanMultiProcess(vertica_sdk.TransformFunction):
    
        def processPartition(self, server_interface, input, output):
            freeze_support()
            server_interface.log(f"Number of CPUs in this node: {mp.cpu_count()}")
    
            device_records = {}
            while True:
                device_id = input.getInt(0)
                if device_id in device_records:  # append to the old list
                    device_records[device_id].append({
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)})
                else:  # create a new record
                    new_record = [{
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)}]  # a list gets created here
                    device_records[device_id] = new_record
    
                if not input.next():
                    break
    
            # We have all the data now. Let us break up the task into multiple processes
            server_interface.log(f"Creating tasks")
            TASKS = [(do_density_based_clustering, (server_interface, device_id, device_records[device_id])) for device_id in device_records]
            server_interface.log(f"{len(device_records)} tasks created")
            NUMBER_OF_PROCESSES = min(mp.cpu_count(), len(device_records))
    
            # Create queues
            task_queue = Queue()
            done_queue = Queue()
    
            # Submit tasks
            for task in TASKS:
                task_queue.put(task)
    
            # Start worker processes
            for i in range(NUMBER_OF_PROCESSES):
                Process(target=worker, args=(task_queue, done_queue)).start()
    
            server_interface.log(f"{NUMBER_OF_PROCESSES} processes created")
    
            for i in range(len(TASKS)):
                device_out_list = done_queue.get()
    
                for val in device_out_list:  # Output a row for each record in the dataframe
                    server_interface.log(f"{int(val[0])}, {int(val[1])}, {int(val[1])}, {val[2]}, {val[3]}, {val[4]}")
                    output.setInt(0, int(val[0]))
                    output.setInt(1, int(val[1]))
                    output.setFloat(2, val[2])
                    output.setFloat(3, val[3])
                    output.setInt(4, int(val[4]))
                    output.next()
    
            # Tell child processes to stop
            for i in range(NUMBER_OF_PROCESSES):
                task_queue.put('STOP')
    
            server_interface.log(f"All records processed. Exiting.")
    

    But the error I am seeing is still:

    /vertica/data/testdb2/v_testdb2_node0001_catalog/Libraries/02e021a977071015fa974ebf8aaca6c500a0000000021fd4/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
      warnings.warn(msg)
    Traceback (most recent call last):
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
        obj = _ForkingPickler.dumps(obj)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
        cls(buf, protocol).dump(obj)
      File "stringsource", line 2, in vertica_sdk.ServerInterface.__reduce_cython__
    TypeError: no default __reduce__ due to non-trivial __cinit__
    
  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    Taking an alternate approach, I was able to get a little further. Using this approach, the query works for small input rows. But with large rows, we get a memory error. If we feed in about 1000 rows, there is no memory error but the output is partial. If someone can make this work for complete output, it will be quite acceptable. Here is the alternate solution:

    import math
    
    import vertica_sdk
    
    import pandas as pd
    import numpy as np
    import multiprocessing as mp
    from sklearn.cluster import DBSCAN
    from multiprocessing import Process, Queue
    from itertools import islice
    
    kms_per_radian = 6371.0088
    epsilon = 50 / kms_per_radian  # 50 Km separation for the clusters
    out_records = Queue()
    
    def chunks(data, SIZE=10000):
       it = iter(data)
       for i in range(0, len(data), SIZE):
          yield {k:data[k] for k in islice(it, SIZE)}
    
    def do_density_based_clustering(server_interface, out_queue, device_id, device_records=None):
        if device_records is None:
            device_records = []
    
        df = pd.DataFrame.from_records(device_records)
        coords = df[['lat', 'lon']].to_numpy()
        db = DBSCAN(eps=epsilon, min_samples=1, algorithm='ball_tree', metric='haversine').fit(np.radians(coords))
        cluster_labels = db.labels_
        df_with_clusters = df.join(pd.DataFrame(cluster_labels.T))
    
        server_interface.log(f"PartitionedDBScanMultiProcess got {len(device_records)} locations and found {len(set(cluster_labels))} personalities for device Id {device_id}")
    
        out_queue.put(df_with_clusters.to_numpy().tolist())
        out_queue.cancel_join_thread()
    
    
    class PartitionedDBScanMultiProcess(vertica_sdk.TransformFunction):
    
        def processPartition(self, server_interface, input, output):
            server_interface.log(f"Number of CPUs in this node: {mp.cpu_count()}")
            device_records = {}
            while True:
                device_id = input.getInt(0)
                if device_id in device_records:  # append to the old list
                    device_records[device_id].append({
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)})
                else:  # create a new record
                    new_record = [{
                        'device_id': input.getInt(0),
                        'cluster_id': input.getInt(1),
                        'lat': input.getFloat(2),
                        'lon': input.getFloat(3)}]  # a list gets created here
                    device_records[device_id] = new_record
    
                if not input.next():
                    break
    
            # We have all the data now. Let us break up the task into multiple processes
            procs = []
            for device_records_chunked in chunks(device_records, mp.cpu_count()):
                for device_id in device_records_chunked:
                    proc = Process(target=do_density_based_clustering, args=(server_interface, out_records, device_id, device_records[device_id]))
                    procs.append(proc)
                    proc.start()
    
                for proc in procs:
                    proc.join()
    
                # The resulting dataframe has (device_id:int, cluster_id:int, lat:float, lon:float, personality_id:int)
                while not out_records.empty():
                    device_out_dataframe = out_records.get()
    
                    for val in device_out_dataframe:  # Output a row for each record in the dataframe
                        server_interface.log(f"Output: {int(val[0])}, {int(val[1])}, {val[2]}, {val[3]}, {val[4]}")
                        output.setInt(0, int(val[0]))
                        output.setInt(1, int(val[1]))
                        output.setFloat(2, val[2])
                        output.setFloat(3, val[3])
                        output.setInt(4, int(val[4]))
                        output.next()
    
            server_interface.log(f"All records processed. Exiting.")
    
    class PartitionedDBScanMultiProcessFactory(vertica_sdk.TransformFunctionFactory):
        def getPrototype(self, server_interface, arg_types, return_type):
            arg_types.addInt()
            arg_types.addInt()
            arg_types.addFloat()
            arg_types.addFloat()
            return_type.addInt()
            return_type.addInt()
            return_type.addFloat()
            return_type.addFloat()
            return_type.addInt()
    
    
        def getReturnType(self, server_interface, arg_types, return_type):
            return_type.addColumn(arg_types.getColumnType(0), "device_id")
            return_type.addColumn(arg_types.getColumnType(1), "cluster_id")
            return_type.addColumn(arg_types.getColumnType(2), "lat")
            return_type.addColumn(arg_types.getColumnType(3), "lon")
            return_type.addColumn(arg_types.getColumnType(0), "personality_id")
    
    
        def createTransformFunction(cls, server_interface):
            return PartitionedDBScanMultiProcess()
    
    
  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    This is the input file for the above problem, for testing purposes. Somehow I could not attach it to the previous comment.

  • HibikiHibiki Vertica Employee Employee
  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist I loaded the data from sample_input.csv and ran my UDx. It worked well without the memory error.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    @Hibiki The attached file does not compile since there is no variable called queue in that section of the code. There are two instances where this is the case.

            except queue.Empty:
                break
    

    Maybe the wrong file got attached. Can you send me the exact file that ran successfully.

  • anupam_at_arrivalistanupam_at_arrivalist Vertica Customer

    If I instead do (not sure if it is correct):

            except Queue.queue.Empty:
                break
    

    and provide some 100 records with only 5 unique device_ids, I get a memory error. So this method is consuming too much memory to be of any practical use.

    (Python error type [<class 'OSError'>])
    Traceback (most recent call last):
      File "/vertica/data/testdb2/v_testdb2_node0001_catalog/Libraries/02e021a977071015fa974ebf8aaca6c500a0000000021fe8/pydbscan_02e021a977071015fa974ebf8aaca6c500a0000000021fe8.py", line 76, in processPartition
        process.start()
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/process.py", line 112, in start
        self._popen = self._Popen(self)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/context.py", line 223, in _Popen
        return _default_context.get_context().Process._Popen(process_obj)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/context.py", line 277, in _Popen
        return Popen(process_obj)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
        self._launch(process_obj)
      File "/opt/vertica/oss/python3/lib/python3.7/multiprocessing/popen_fork.py", line 70, in _launch
        self.pid = os.fork()
    OSError: [Errno 12] Cannot allocate memory
    
  • HibikiHibiki Vertica Employee Employee

    @anupam_at_arrivalist I forgot to add "import queue". I attached the file again. I have run this script in 3-nodes cluster. I confirmed 3 vertica-udx-Python processes were launched on each node and the total RSS size per node was 330MB. It was a small allocation.

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file