Hi, I'm currently working on the Java UDX on Vertica. Is it possible to make parallelisable (map/reduce or something similar) Java UDF functions ? Kind regards, Pierre-Aymeric MASSE
I would recommend that you look specifically at the Java UDT (User-Defined Transform) API. UDT's are very parallelizable, and are in many ways conceptually similar to map/reduce jobs. (Though they are not exactly the same as map/reduce jobs and don't share the same API as Hadoop or other map/reduce implementations.) Some example UDT's are included in /opt/vertica/sdk/examples/JavaUDx/ as part of the Vertica Server RPM.
When you call a UDT from SQL, as part of the function invocation (the "OVER ()" clause), you must specify how to partition the data. Vertica parallelizes by operating on multiple partitions at the same time.
Note that you can write SQL expressions that call multiple UDT functions, possibly with different partition clauses. You might emulate simple map/reduce by having two Java UDT's; one called "mapper" and one called "reducer". Then run a query like:
SELECT reducer(...) OVER () FROM ( SELECT mapper(...) OVER (PARTITION AUTO) FROM table ) t;
Then the reducer runs on one node and gets all of the parallel mappers' outputs as a single stream.
That's just a simple example, though. You can easily stack these sorts of expressions to build an arbitrary multi-stage pipeline. For example, say you had a table of URLs and you wanted to download all of the listed pages, parse the HTML to build a graph of links between pages, and then perform PageRank and list the titles of the top ten highest-ranked pages. This might look something like: (PageRank probably wants more arguments and there are likely more efficient ways to do this, but hopefully you get the idea)
SELECT html_fetch_title(url) FROM ( SELECT page_rank(page_url, link_url) OVER (PARTITION BY link_url) FROM ( SELECT page_url, html_to_links(page_html_as_text) OVER (PARTITON BY page_url) FROM ( SELECT recursive_download(url) OVER (PARTITION AUTO) FROM sites_to_search ) pages ) link_graph ) page_ranks ORDER BY page_rank DESC LIMIT 10;
Then you would have to implement the UDT's page_rank(), html_to_link_graph(), and recursive_download(). recursive_download() would emit a column "page_html_as_text", one row per page recursively downloaded; html_to_links() would parse each row of HTML that it receives and emit all the URLs of links for that page (note that it doesn't need to actually know the URL of the current page); and page_rank takes collections of {page URL; all pages that link to that URL} and computes a page rank.
You would also have to implement html_fetch_title(), which takes a URL, downloads it, and returns the page title. This is actually not a UDT, but rather a UDSF -- it takes a single value and returns a (different) single value. We parallelize these too; completely automatically, no need for an OVER clause. (It's admittedly kinda silly to re-download the page here; we already downloaded it before. But it's only ten Web requests because of the "LIMIT 10", and I wanted to work a UDSF into the example somehow...)
Vertica will automatically orchestrate this whole process; parallelize it, distribute it across the cluster, etc. And perform various optimizations based on available system resources, minimizing disk I/O, etc -- we tend to achieve dramatically better performance . Just give us the query and the linear functions; we take it from there.
However, in your explanation you speak mainly of the sql query to perform parallelization (with OVER), but suddenly in java we only define jobs and no interaction between them (which map, which is for reduce etc.. ..).
I am trying to make a pagerank precisely suddenly, if I want to parallelize my function, can i do it with a simple query like : "SELECT pageRank (page_url, link_url, nb_link) FROM T" ? it's like thread ^^', i split the data in the java function.
If I want to parallelize it, I have to cut java functions ?
I assume you are referring to Hadoop in particular, rather than map/reduce in general? Since you're talking about Java. (Map/reduce is an idea, not a program or a strict specification; there are many implementations of map/reduce all with different API's. Google, the publishers of the original map/reduce paper, has stated that their implementation is/was in C++.)
In Hadoop, you must specify which of your functions is a mapper and which is a reducer. This is not automatic. And mappers and reducers are separate functions; they can't talk directly with one another.
All map/reduce implementations use partitioning, same as Vertica in this example, to split up data so that each mapper gets only part of the data. Hadoop does so by implementing the "Partitioner" interface. A default Partitioner is provided; the data is partitioned by hash unless you specify otherwise. Same for Vertica; see my second example above. You can also implement a Vertica Partitioner in Java, but in my experience most people don't bother; a simple SQL expression is good enough and is much quicker than writing a custom Java class.
Think of SQL like your configuration file: You always have to tell the system which jobs to run somehow, and how to get data between them. We just happen to use SQL as our configuration language.
Comments
The short answer is "Definitely, yes."
I would recommend that you look specifically at the Java UDT (User-Defined Transform) API. UDT's are very parallelizable, and are in many ways conceptually similar to map/reduce jobs. (Though they are not exactly the same as map/reduce jobs and don't share the same API as Hadoop or other map/reduce implementations.) Some example UDT's are included in /opt/vertica/sdk/examples/JavaUDx/ as part of the Vertica Server RPM.
When you call a UDT from SQL, as part of the function invocation (the "OVER ()" clause), you must specify how to partition the data. Vertica parallelizes by operating on multiple partitions at the same time.
Note that you can write SQL expressions that call multiple UDT functions, possibly with different partition clauses. You might emulate simple map/reduce by having two Java UDT's; one called "mapper" and one called "reducer". Then run a query like:
SELECT reducer(...) OVER () FROM (
SELECT mapper(...) OVER (PARTITION AUTO) FROM table
) t;
Then the reducer runs on one node and gets all of the parallel mappers' outputs as a single stream.
That's just a simple example, though. You can easily stack these sorts of expressions to build an arbitrary multi-stage pipeline. For example, say you had a table of URLs and you wanted to download all of the listed pages, parse the HTML to build a graph of links between pages, and then perform PageRank and list the titles of the top ten highest-ranked pages. This might look something like: (PageRank probably wants more arguments and there are likely more efficient ways to do this, but hopefully you get the idea)
SELECT html_fetch_title(url) FROM (
SELECT page_rank(page_url, link_url) OVER (PARTITION BY link_url) FROM (
SELECT page_url, html_to_links(page_html_as_text) OVER (PARTITON BY page_url) FROM (
SELECT recursive_download(url) OVER (PARTITION AUTO) FROM sites_to_search
) pages ) link_graph ) page_ranks
ORDER BY page_rank DESC LIMIT 10;
(If you don't like writing SQL, there's a program out there that tries to run scripts written in Apache Pig Latin instead: <https://github.com/vertica/Vertica-Hadoop-Connector/tree/master/squeal/>)
Then you would have to implement the UDT's page_rank(), html_to_link_graph(), and recursive_download(). recursive_download() would emit a column "page_html_as_text", one row per page recursively downloaded; html_to_links() would parse each row of HTML that it receives and emit all the URLs of links for that page (note that it doesn't need to actually know the URL of the current page); and page_rank takes collections of {page URL; all pages that link to that URL} and computes a page rank.
You would also have to implement html_fetch_title(), which takes a URL, downloads it, and returns the page title. This is actually not a UDT, but rather a UDSF -- it takes a single value and returns a (different) single value. We parallelize these too; completely automatically, no need for an OVER clause. (It's admittedly kinda silly to re-download the page here; we already downloaded it before. But it's only ten Web requests because of the "LIMIT 10", and I wanted to work a UDSF into the example somehow...)
Vertica will automatically orchestrate this whole process; parallelize it, distribute it across the cluster, etc. And perform various optimizations based on available system resources, minimizing disk I/O, etc -- we tend to achieve dramatically better performance . Just give us the query and the linear functions; we take it from there.
Adam
However, in your explanation you speak mainly of the sql query to perform parallelization (with OVER), but suddenly in java we only define jobs and no interaction between them (which map, which is for reduce etc.. ..).
I am trying to make a pagerank precisely
suddenly, if I want to parallelize my function, can i do it with a simple query like :
"SELECT pageRank (page_url, link_url, nb_link) FROM T" ?
it's like thread ^^', i split the data in the java function.
If I want to parallelize it, I have to cut java functions ?
Thx !
I'm not quite sure I understand...
I assume you are referring to Hadoop in particular, rather than map/reduce in general? Since you're talking about Java. (Map/reduce is an idea, not a program or a strict specification; there are many implementations of map/reduce all with different API's. Google, the publishers of the original map/reduce paper, has stated that their implementation is/was in C++.)
In Hadoop, you must specify which of your functions is a mapper and which is a reducer. This is not automatic. And mappers and reducers are separate functions; they can't talk directly with one another.
All map/reduce implementations use partitioning, same as Vertica in this example, to split up data so that each mapper gets only part of the data. Hadoop does so by implementing the "Partitioner" interface. A default Partitioner is provided; the data is partitioned by hash unless you specify otherwise. Same for Vertica; see my second example above. You can also implement a Vertica Partitioner in Java, but in my experience most people don't bother; a simple SQL expression is good enough and is much quicker than writing a custom Java class.
Think of SQL like your configuration file: You always have to tell the system which jobs to run somehow, and how to get data between them. We just happen to use SQL as our configuration language.
Adam