Parallel R UDF

Hi,

I have a few queries on parallelizing R UDFs 

Are the UDFs written in R parallelizable? If so, are there any examples of parallel R UDF codes?

If the implementation is not parallel,  will the entire data be brought into one of the nodes for processing? If so, how does Vertica decide which node to bring the data into? Will it be constrained by memory of that one node? How does the performance compares with using RODBC/RJDBC?

Please clarify.

Thank you.

Ravi

Comments

  • Hi Ravi,

    All R-UDx functions are automatically parallelizable.  This includes the examples in /opt/vertica/sdk/examples/RFunctions/ .

    R UDx functions are parallelizable because they need to be run many times.  Each call is serial.  But we call the function many times simultaneously.

    There are multiple types of R functions.  R UDT ("User-Defined Transform") functions operate on large blocks of data at a time, rather than single rows.  So when you call a UDT, you must specify an OVER clause that tells us how to partition the data such that we can work on many independent partitions at once:

    https://my.vertica.com/docs/7.0.x/HTML/index.htm#Authoring/ProgrammersGuide/Analytics/HowAnalyticFun...

    This has been discussed in more detail in a previous post here:

    https://community.vertica.com/vertica/topics/using_rs_standard_functions_in_vertica

    If you run a UDT and provide an OVER clause that results in a single giant partition, then we will have to bring the data to a single node and you will not necessarily see better performance than over ODBC.

    Each call to an R UDx is limited to the memory of the node that it runs on.  (Possibly a fraction of that memory; we may run several instances of your function simultaneously on each node if you have multiple CPU cores.)  The hope is that, by splitting the problem up, each piece will need less memory; and by distributing the pieces across many different nodes, you get access to more total memory.

    Note that some algorithms use much less memory, and/or run faster, if they receive the data sorted in a particular order.  The UDT "OVER" clause allows you to specify the sort order of the incoming data for this reason.

    We have blogged about a technology that we've been working on that allows you to write more-general distributed algorithms:

    http://www.vertica.com/2013/02/21/presto-distributed-r-for-big-data/

    But I don't believe the technology has yet been released.  And to write a new algorithm against it, you will have to know a little more about writing distributed algorithms than is required by R-UDx.

    Adam
  • Thanks a lot Adam for the detailed reply. This was really helpful.
  • Hi Adam!

         I am trying to achieve a parallel UDTF through R but I'm finding some obstacles. Let me explain.

         My UDTF receives columns from a subquery and some parameters. Using the parameters I read a .RData file which contains a specific model for the columns I receive. Combining both through the UDTF I get back a matrix as a result.

         SELECT myFunc(C.IDNum...C.coln USING PARAMETERS x = 1, y = 2) OVER() FROM (subquery) AS C;

         The subquery can be modified to return a single IDNum or multiple IDNums in the table. I intend to use PARTITION BY to execute the UDTF over different IDNum's to be done in parallel. My UDTF processes each IDNum individually. It was built to work with a unique IDNum per table.

         However when I execute the above query with OVER(PARTITION BY C.IDNum) it returns an error. [cannot open the connection]

         It never reads the same file twice, it never writes to a file at any point. I suspected however that loading an RData file was causing some sort of error when scaling to parallel case. So I tested that with fixed values and no file loading and it turned out


          [Package X doesn't exist] <- it exists and is installed correctly. As I stated before, single case scenario with no partition by works fine.

          I'll run further tests but I wanted to post this in the community.



    Thanks




  • I still require specific order in the partitions, so AUTO is not an option. I tried it just now and it returns an error as well but on the description of data rather than reading error =S
  • Hm...  Could you provide some more detail about that error?  Also, is there any interesting logging output in the UDxLogs/ directory (inside your catalog directory)?

    At first glance, that query looks reasonable to me.

    In general, though, Vertica does not guarantee anything about the ordering of partitions.  Any given partition may be processed before, after, or concurrently with any other partition.  (That's the idea behind partitions -- each partition should be unrelated to every other partition.)  If you do want ordering, you should use an ORDER BY clause instead of (and/or in addition to) a PARTITION BY clause.
  • I found part of the error so far. Since we're working on a Vertica Cluster the model apparenly should be found on every server on the same address (untested theory, on development). As far as I understand when partitioning Vertica sends the data on queue to different machines to be executed, thus if the file is not on that specific machine that would cause the file connection to be non existant and thus return an error.


    Second, we're still looking into what happened about the error on library. It is supposedly installed on every machine, but  only 1 library failed to be loaded when we're calling another one as well.


    About the logs, where exactly are they stored? I am only a Vertica Pesudo-Superuser

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file