Please take this survey to help us learn more about how you use third party tools. Your input is greatly appreciated!

Spark Connector - filtering data from Vertica before loading

Hey,
I want to load 2 tables into Spark, join them and do some transformations but the tables are HUGE and no matter how I split them into amount of partitions, it uses too much storage and fails / create a huge query queue in Vertica and they I get many timeouts.
So I decided to query the data in batches by using modulo. For example, for 50 batches I'll create 50 dataframe where in each one I query by hash(column) % 50 = 1, hash(column) % 50 = 2, ...., hash(column) % 50 = 49.
Anyways, I've tried that but from the explain plan, seems like the filtering happens AFTER the data is being loaded. Query:
table1.filter(s"hash(userID) % 50 = 1").groupBy("userID"). agg(collect_list(someUDF(col("col1"), col("col2"))) as "colTemp", max("col3") as "col3", max("col4")).as("A").join( tabl2.filter(s"hash(userId) % $50 = 1").select("userId", "col4", "col5").as("B"), col("A.userID") === col("B.userId"), "left")

And when I look at the Vertica session I see the following:
user sparkExporter (select "userid","col2","col3" from "schema"."table1" where( ((0x00000000ffffffff & hash(table1.userId) ) >= 0 and (0x00000000ffffffff & hash(table1.userId) ) <= 4266334183 ) or ( (0x00000000ffffffff & hash(table1.userId) ) >= 4266334184 )) AND ("userId" is NOT NULL))

Same query for table2.

Spark explain plan:

: +- Aggregate [userID#0L], [userID#0L, collect_list(if (isnull(cast(segKey#1L as int))) null else UDF(cast(segKey#1L as int), cast(prob#2 as double)), 0, 0) AS segmentsProba#206, max(country#6) AS country#208, max(lastUpdatedTs#3) AS max(lastUpdatedTs)#210]
+- Filter ((hash(userID#0L, 42) % 30) = 4)
Relation[userID#0L,segKey#1L,prob#2,lastUpdatedTs#3,insertTs#4,partitionKey#5L,country#6] VerticaRelation

Do I have a way to ensure/tweak the query to make sure that I only loaded the needed data?

Answers

  • What version of Spark are you running, and what programming language and version, and how many executors/workers/memory do you configure for the job? I've been able to get some partitioning to work using very recent Spark (2.4.6 or 3.x) and latest Scala or Python3, but it's quite sensitive to SQL syntax as well as system resources on the workers and server side. The latest documentation shows additional parameters available to load DF's: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file

Can't find what you're looking for? Search the Vertica Documentation, Knowledge Base, or Blog for more information.