Options

Spark Connector - filtering data from Vertica before loading

Hey,
I want to load 2 tables into Spark, join them and do some transformations but the tables are HUGE and no matter how I split them into amount of partitions, it uses too much storage and fails / create a huge query queue in Vertica and they I get many timeouts.
So I decided to query the data in batches by using modulo. For example, for 50 batches I'll create 50 dataframe where in each one I query by hash(column) % 50 = 1, hash(column) % 50 = 2, ...., hash(column) % 50 = 49.
Anyways, I've tried that but from the explain plan, seems like the filtering happens AFTER the data is being loaded. Query:
table1.filter(s"hash(userID) % 50 = 1").groupBy("userID"). agg(collect_list(someUDF(col("col1"), col("col2"))) as "colTemp", max("col3") as "col3", max("col4")).as("A").join( tabl2.filter(s"hash(userId) % $50 = 1").select("userId", "col4", "col5").as("B"), col("A.userID") === col("B.userId"), "left")

And when I look at the Vertica session I see the following:
user sparkExporter (select "userid","col2","col3" from "schema"."table1" where( ((0x00000000ffffffff & hash(table1.userId) ) >= 0 and (0x00000000ffffffff & hash(table1.userId) ) <= 4266334183 ) or ( (0x00000000ffffffff & hash(table1.userId) ) >= 4266334184 )) AND ("userId" is NOT NULL))

Same query for table2.

Spark explain plan:

: +- Aggregate [userID#0L], [userID#0L, collect_list(if (isnull(cast(segKey#1L as int))) null else UDF(cast(segKey#1L as int), cast(prob#2 as double)), 0, 0) AS segmentsProba#206, max(country#6) AS country#208, max(lastUpdatedTs#3) AS max(lastUpdatedTs)#210]
+- Filter ((hash(userID#0L, 42) % 30) = 4)
Relation[userID#0L,segKey#1L,prob#2,lastUpdatedTs#3,insertTs#4,partitionKey#5L,country#6] VerticaRelation

Do I have a way to ensure/tweak the query to make sure that I only loaded the needed data?

Answers

  • Options
    Bryan_HBryan_H Vertica Employee Administrator

    What version of Spark are you running, and what programming language and version, and how many executors/workers/memory do you configure for the job? I've been able to get some partitioning to work using very recent Spark (2.4.6 or 3.x) and latest Scala or Python3, but it's quite sensitive to SQL syntax as well as system resources on the workers and server side. The latest documentation shows additional parameters available to load DF's: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file