UDAF aggregate method is called for each value

Hi,

I am creating a UDAF that does approximate distinct values counting (hyperloglog).
What this algorithm does is create a 4K memory from a list of values (regardless of the list size). This 4K memory block can be used to estimate the number of distinct values in the original list. In addition, it can be merged with other 4K memory blocks, so that you can estimate the count of distinct elements of several groups.

I've implemented a c++ implementation of this algorithm, and successfully used it both as a mysql plugin and as a Hadoop HIVE and pig UDFs, that worked great performance wise.

I am trying to use the same code as a vertica UDAF. It works very inefficiently.
What I am seeing is that the AggregateFunction.aggregate method is called for each single value. What this does is create a 4K memory block for each value, and later the combine method combine all these 4K blocks into a single block and then estimated.

This can be much more efficient if multiple values would be passed to the aggregate method (which according to the documentation it should). I am not sure why it behaves this way.
I am using community edition on a single node in an ubuntu VM.

Would really appreciate any advice on how to resolve this

Thanks
Amir

Comments

  • Hi Amir,

    Are you familiar with Vertica's "APPROXIMATE_COUNT_DISTINCT()" function?  We already provide the functionality that you are trying to implement on your own, with some additional features as well.

    APPROXIMATE_COUNT_DISTINCT() is implemented using the UDAggregate API (and not using any special internal API's, etc).  I haven't personally checked how many rows it sees at a time.  But we have generally seen good performance with it.

    Regarding the SDK -- I'm not sure why you are seeing a single value at a time.  However, this sort of thing tends to be quite sensitive to data size, layout, etc -- if you have a multi-node cluster and you have a small table that only has one row per node, for example, you're probably only going to get one row...  Could you post some example SQL that creates a table, loads some sample data, and runs your function?  (Where you have run this SQL yourself and verified that your code is only seeing one row at a time.)

    Thanks,
    Adam
  • Hi, Adam

    Thank you for the quick reply.

    I am aware of the APPROXIMATE_COUNT_DISTINCT, but it does not answer my needs.
    I need to store the 4K blocks in vertica, and later merge them to get an approximate count of several groups.
    For example, I might have a table where I store a 4K block for each day, and later need to get an approximate count for a week.
    In addition, sometimes I use the same algorithm from hadoop, and so I need the exact same algorithm to be used both in Vertica and in Hadoop.

    Regarding the use case - I have a table of raw data with 50000 records. I then create an aggregate table from it, and I use my UDAF to create 4k blocks from the 50000 records.
    Performance was not good - and when I started troubleshooting - that's when I saw that the data was coming in one row at a time.

    All the code is open source - you can see it on https://github.com/amirtuval/vertica-hyperloglog.
    There's an sql script at https://github.com/amirtuval/vertica-hyperloglog/blob/master/sql/example.sql, that shows how to load data and use the UDAF.
    To compile it, simply clone and run make.

    It is a little rough, but I hope it is enough :-)
    Thanks
    Amir





  • Hi Amir,

    Thanks for posting the code!  I'll take a look.

    Just out of curiosity -- I don't see any logging in your aggregate() function; how are you verifying that it's only seeing one row?  (Temporary logging statements?  Walking through in gdb or similar?)

    Regarding that use case -- if you need exactly the exact same algorithm and intermediate data structure, then that makes sense.  If you just need to persist intermediate results, though, you can do that with "APPROXIMATE_COUNT_DISTINCT_SYNOPSIS()".

    Adam
  • Thanks

    Was not aware of APPROXIMATE_COUNT_DISTINCT_SYNOPSIS.
    That actually would have saved me a lot of time if I did not need the same algorithm in hadoop :-).
    Great job!

    I used temporary logging - it is not in the code that's pushed to github.

    Thanks
    Amir

  • Hi, Adam

    Was wondering if you had a chance to review the code - would really appreciate any feedback.

    Thanks
    Amir
  • Hi Amir,

    I haven't had a chance to look too closely.  But I tried to compile your code, and ran into the following errors:

    AggregateFunctions/Base.cpp:11:10: fatal error: 'SerializedHyperLogLog.hpp' file
          not found
    #include "SerializedHyperLogLog.hpp"
             ^
    1 error generated.
    In file included from AggregateFunctions/HllCompute.cpp:11:
    AggregateFunctions/Base.h:5:10: fatal error: 'constants.hpp' file not found
    #include "constants.hpp"
             ^
    1 error generated.


    Regardless, in playing around, I do see that sometimes we are in fact only providing a single row per call to aggregate().  I'm not yet entirely sure why, so I unfortunately don't know a workaround yet...

    Adam

  • Hi, Adam

    Sorry about not specifying this. This repo has several git submodules that you need to get as well before you can successfully compile it.

    Here's a full list of steps:

    git clone https://github.com/amirtuval/vertica-hyperloglog.git
    cd vertica-hyperloglog/
    cd mysql-hyperloglog/
    git submodule update --init
    cd libmysqlhll/cpp-hyperloglog/
    git submodule update --init
    cd ../../..
    make



    Thanks
    Amir


  • Hi, Adam

    Was wondering if you had a chance to take a look, and if you have more feedback on this issue.

    Anything would be much appreciated

    Thanks
    Amir
  • Hi Amir,

    Sorry for the delay...  I'm afraid I don't have a particularly useful answer:  You have correctly identified a performance issue; it seems to happen whenever you do an aggregate plus a group by.  The API should lead to correct results, but obviously with suboptimal performance.  There is no great workaround right now.

    You could implement the function against our UDT API, instead of our UDAggregate API.  This should give you better performance, but would require writing clunkier SQL.

    We're looking at the issue; hopefully it will be fixed in a future release.  I can't make public commitments on our forums, though; those have to come through Support.  If you'd like more details, and if you have an Enterprise account, please open a Support case; they can elaborate.

    Adam
  • Hi Adam, and thanks for the answer.

    Do not have an enterprise account yet, but should have one very soon. I'll contact support when I do.

    By "UDT" - do you mean the "User Defined Transform function"? Will that work with a group by query, using other aggregate functions - SUM/AVG etc.?

    In addition, you said you implemented your APPROXIMATE_COUNT_DISTINCT function using the UDAF mechanism, and are seeing good performance.
    Is there some place where the code is published, so that I'll take a look? It could help me in one of two ways - either I can see how I can improve my implementation or I can use your implementation from within hadoop. I know it is a long shot, but it does not hurt to ask, right? :-)

    Thanks for all your help
    Amir
  • Hi Amir,

    "UDT" -- yep, I mean "User Defined Transform function".  It's awkward to combine UDT's with other aggregate functions.  But you can call them in a subquery; at which point you can do pretty much whatever you want.

    Unfortunately, as far as I know, the source of APPROXIMATE_COUNT_DISTINCT() has not been released...

    In any case, thanks for reporting the issue!  Hopefully Support will be able to help you out further.

    Adam
  • Hi, Adam

    I noticed that the aggregate method is actually invoked by the InlineAggregate macro - which implements the aggregateArrs method.

    The implementation itself has a loop over the count variable.
    I printed the count variable to the log and I am seeing that this method receives about 150 records at a time, and that this macro is the cause that the aggregate method is invoked for each value.

    I can implement this method without using the macro, probably resulting in much better performance.

    Before I do, I was wondering if you see any downside to this approach. Would it break in future versions of Vertica? Are there nuances I should be aware of?

    Thanks
    Amir
  • Hi Amir,

    I hear you folks are trying to reach us from a bunch of directions at once :-)

    I unfortunately don't know the internals of this API in depth.

    I will say that our API stability policy is currently that we won't break binary compatibility except on major releases or the first service pack after a major release.  So you're safe on that timeframe; but after that, we may change the implementation.

    Adam
  • OK. thanks

    I will try this approach and will update.

    Thanks for all your help
    Amir
  • Just posting an update here:  This is an issue in some sense, but it's complicated.

     

    There was a specific, narrow issue in certain cases.  I believe that has been addressed as a micro-optimization; it helps a little in certain cases.

     

    There's a more general issue, though.  Say you have data where the grouping key looks like:

     

    A

    A

    B

    C

    B

    D

    B

    C

    C

    B

     

    Vertica works on a streaming/iteration model; it iterates over rows as they are read from disk (unless this is impossible) rather than reading everything into memory up front.

     

    So, we get two A's in a row.  Great!; we can aggregate on a batch of ... only two rows.  Then we get a B.  The next value is not a B, so we can't accumulate a run of rows; we just give you the B.

     

    We could, alternatively, cluster each batch:

     

    A

    A

    B

    B

    B

    B

    C

    C

    C

    D

     

    Then you get all your A's, then all your B's, etc.  But if the data is not already sorted, this requires a sort.  (Or a hash -- either way, reading all of the data into memory.)

     

    For data sets that don't fit into memory, which is common with Vertica (and less common in many other systems), producing this list requires a disk-backed sort (or hash, etc), which is assuredly slower than the added per-row virtual function call in the current implementation.  Much more efficient to have the API provide one row at a time.

     

    If you have a function for which each call is expensive, but for which the per-row cost should be cheap, our recommended advice is to re-think how you're structuring your function:  Think of the whole class as one big function call.  Put the expensive per-"function call" operations in setup().  Then have process*() function as your fast per-row inner loop.  Turn function-local variables into class-member variables.

     

    You can also have your function batch rows itself, if that really is best for your application -- std::unordered_map<>() is really pretty efficient for many common simple tuple structures.

     

    It's possible to write a wrapper that does this automatically for you.  See the Coroutine* helper classes in our SDK examples for a rough idea of how you might go about doing so.  But that gets complicated in a hurry.

     

    There are cases where the data could usefully be clustered in memory, and where the virtual-function-call overhead really is significant as compared to the rest of the cost of the aggregation and as compared to the computational cost of grouping the data.  This requires a very simple group-by key, a very cheap aggregation function, and data with lots of duplicate values that you have not chosen to sort by on disk.  (If you can sort your projection by that column, great!; do that :-) )  If you're in that position, and you have an application that demonstrably depends on this performance difference, please get in touch with our Support and/or PM folks; we would like to hear from you.

     

    Adam

  • Hi,

     

    I was just revisiting this issue from a while back.

     

    I noticed in your documentation that there's an optimization for aggregate functions when used in group by - here's the link

     

    I do not remeber seeing it when I was troubleshooting this issue the first time.

     

    Any chance this could help?

     

    Thanks

    Amir

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file