HPE Vertica Connector for Apache Spark: Error reading DataFrame
Scenario:
Read data into Spark Dataframe from Vertica VMart table using "com.vertica.spark.datasource.DefaultSource"
Env:
HPE Vertica Community Edition VM
select version();
Vertica Analytic Database v7.2.0-0
Spark version:
version 1.5.0
Downloaded "HPE Vertica Connector For Apache Spark" from
https://saas.hpe.com/marketplace/haven/hpe-vertica-connector-apache-spark
Repro:
spark-shell --jars /Users/jackg/projects/BigDataInfra/hpe_vertica_spark_connector/vertica-jdbc-7.1.2-0.jar,/Users/jackg/projects/BigDataInfra/hpe_vertica_spark_connector/vertica-spark-connector.jar --master local[1]
scala> val opt: Map[String, String] = Map("table" -> "customer_dimension", "db" -> "VMart", "user" -> "dbadmin", "password" -> "redacted", "host" -> "192.168.1.90", "dbschema" -> "public", "port" -> "5433", "numPartitions"-> "4", "tmpdir" -> "/tmp")
opt: Map[String,String] = Map(host -> 192.168.1.90, tmpdir -> /tmp, db -> VMart, dbschema -> public, port -> 5433, user -> dbadmin, numPartitions -> 4, table -> customer_dimension, password -> password)
scala> val df = sqlContext.read.format("com.vertica.spark.datasource.DefaultSource").options(opt).load()
df: org.apache.spark.sql.DataFrame = [customer_key: bigint, customer_type: string, customer_name: string, customer_gender: string, title: string, household_id: bigint, customer_address: string, customer_city: string, customer_state: string, customer_region: string, marital_status: string, customer_age: bigint, number_of_children: bigint, annual_income: bigint, occupation: string, largest_bill_amount: bigint, store_membership_card: bigint, customer_since: date, deal_stage: string, deal_size: bigint, last_deal_update: date]
// we have a DF
scala> df.rdd.partitions.size
res0: Int = 4
scala> df.printSchema
root
|-- customer_key: long (nullable = false)
|-- customer_type: string (nullable = true)
...
Test:
// Prove that we can work with RDDs in Spark 1.5.0
scala> val df1 = sc.makeRDD(1 to 12, 4).toDF("IntCol") // RDD with 4 partitions to a DataFrame
scala> df1.take(4).foreach(println)
[1]
[2]
[3]
[4]
// get a row from the Vertica connector DF
scala> df.take(1).foreach(println)
15/11/21 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(ILjava/lang/String;)V
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.getNext(VerticaRDD.scala:368)
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.hasNext(VerticaRDD.scala:421)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/21 15:10:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(ILjava/lang/String;)V
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.getNext(VerticaRDD.scala:368)
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.hasNext(VerticaRDD.scala:421)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/21 15:10:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(ILjava/lang/String;)V
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.getNext(VerticaRDD.scala:368)
at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.hasNext(VerticaRDD.scala:421)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/21 15:10:06 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Comments
I tested this on Spark 1.4.1 and it works.
scala> df.take(2).foreach(println)
[5,Individual,Craig S. Robinson,Male,Sir,10858,138 Alden Ave,Fayetteville,NC,East,Unknown,19,0,79772,Detective,153,0,1986-12-20,null,null,null]
[18,Individual,Mark M. Kramer,Male,Dr.,28093,311 Green St,Joliet,IL,MidWest,Separated,58,1,363387,Chef,844,0,2006-05-18,null,null,null]