Using VerticaInputFormatter and VerticaOutputFormatter from a Spark Job
I read this code in the vertica documentation
and I was successsfully able to write a map reduce program which can push data to HP Vertica based on this code.
However my requirement is to be able to push data from "Spark" to Vertica.
So I tried to modify the sample so that I can push data from my spark application to Vertica.
This is what I wrote
object VerticaSpark extends App {
val scConf = new SparkConf
val sc = new SparkContext(scConf)
val conf = new Configuration()
val job = new Job(conf)
job.setInputFormatClass(classOf[VerticaInputFormat])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[VerticaRecord])
job.setOutputFormatClass(classOf[VerticaOutputFormat])
VerticaInputFormat.setInput(job, "select * from Foo where key = ?", "1", "2", "3", "4")
VerticaOutputFormat.setOutput(job, "Bar", true, "name varchar", "total int")
val rddVR : RDD[VerticaRecord] = sc.newAPIHadoopRDD(job.getConfiguration, classOf[VerticaInputFormat], classOf[LongWritable], classOf[VerticaRecord]).map(_._2)
val rddTup = rddVR.map(x => (x.get(1).toString(), x.get(2).toString().toInt))
val rddGroup = rddTup.reduceByKey(_ + _)
val rddVROutput = rddGroup.map({
case(x, y) => (new Text("Bar"), getVerticaRecord(x, y, job.getConfiguration))
})
//rddVROutput.saveAsNewAPIHadoopFile("Bar", classOf[Text], classOf[VerticaRecord], classOf[VerticaOutputFormat], job.getConfiguration)
rddVROutput.saveAsNewAPIHadoopDataset(job.getConfiguration)
def getVerticaRecord(name : String, value : Int , conf: Configuration) : VerticaRecord = {
var retVal = new VerticaRecord(conf)
//println(s"going to build Vertica Record with ${name} and ${value}")
retVal.set(0, new Text(name))
retVal.set(1, new IntWritable(value))
retVal
}
}
My Full code is published at
https://github.com/abhitechdojo/VerticaSpark
However when I run my code and try to push data from Spark into Vertica. I get a NULL pointer exception
16/01/15 16:42:53 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 5, machine): java.lang.NullPointerException
at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:39)
at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:38)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:999)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 12, machine): java.lang.NullPointerException
at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:39)
at com.abhi.VerticaSpark$$anonfun$4.apply(VerticaSpark.scala:38)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:999)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/15 16:42:54 INFO TaskSetManager: Lost task 3.1 in stage 1.0 (TID 11) on executor machine: java.lang.NullPointerException (null) [duplicate 7]
0
Comments
Hi!
TL;DR
But:
1. may be a problem in your environment(for example some jar is missing on some node). Have you tried running it on single node environment?
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 5, machine)
2. probably an entry in the dataset that does not follow the format (or its a consequence of [1])
???
PS
It is by chance?
"select * from Foo where key = ?", "1", "2", "3", "4"
BTW: can you post your environment(for any case)? type of hadoop distribution, version for hadoop, scala (whatever)?
I am using Vertica 7.2.1 and I am using Spark 1.3.1 on Cloudera 5.4.1
I checked
1. the machine names are alright (the same machine names work perfectly in map reduce code)
2. the format matches. (the same format and table works in map reduce code).