Verica Write Operation from Spark not working
Hi Team,
I want to write a data frame in vertica table through spark. I have the following options:
- Spark Vertica connector ( https://github.com/vertica/spark-connector/releases )
- Using JDBC ( https://www.vertica.com/download/vertica/client-drivers/ )
- vertica_python module ( sequential )
- 1: Using Spark Vertica connector I get the error :
- py4j.protocol.Py4JJavaError: An error occurred while calling o132.save.
- java.lang.NoSuchMethodError: java.lang.String.isBlank()Z
This can be due to the version im using , my versions are :
Spark : Spark 3.3.1
Vertica : Vertica Analytic Database v9.2.1-28
Java : openjdk version "1.8.0_372"
2: Using JDBC : I tried with many JDBC drivers Im always able to read the table but when I perform the write operation it gives me error :
java.sql.SQLFeatureNotSupportedException: [Vertica]JDBC Driver does not support this optional feature.
Code : df.write \
.jdbc(jdbc_url, table_name, mode="append", properties=properties)
3: vertica_python Im able to write the data into table using INSERT into command but I don't want to do this as I have huge dataframe and want to load the data in batch
Let me know if anyone has idea how to connect vertica with spark
Tagged:
0
Answers
Hi haidernaveed_100,
Vertica 9.2 is a five year old version.
The spark connector asks for at least Vertica 10.1.1 (which is already a two and half year old version):
I would start there first!
java.lang.String.isBlank() method was added in Java 11. You'll need a newer runtime. (edit) This is fixed in source (issue#534( but not in a release build.
@VValdar & @Bryan_H
In order to use this connector im now using:
Vertica 24.2
spark3.3.1
HDFS cluster
openjdk version "1.8.0_302"
OpenJDK Runtime Environment (build 1.8.0_302-b08)
at com.vertica.spark.util.schema.SchemaTools.findEmptyColumnName$1(SchemaTools.scala:683)
at com.vertica.spark.util.schema.SchemaTools.checkBlankColumnNames(SchemaTools.scala:693)
at com.vertica.spark.util.schema.SchemaTools.checkValidTableSchema(SchemaTools.scala:562)
at com.vertica.spark.util.table.TableUtils.buildCreateTableStmt(TableUtils.scala:119)
at com.vertica.spark.util.table.TableUtils.createTable(TableUtils.scala:247)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$12(VerticaDistributedFilesystemWritePipe.scala:136)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$11(VerticaDistributedFilesystemWritePipe.scala:133)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$11$adapted(VerticaDistributedFilesystemWritePipe.scala:132)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$10(VerticaDistributedFilesystemWritePipe.scala:132)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$9(VerticaDistributedFilesystemWritePipe.scala:131)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$9$adapted(VerticaDistributedFilesystemWritePipe.scala:130)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$8(VerticaDistributedFilesystemWritePipe.scala:130)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$7(VerticaDistributedFilesystemWritePipe.scala:127)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$6(VerticaDistributedFilesystemWritePipe.scala:124)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$6$adapted(VerticaDistributedFilesystemWritePipe.scala:121)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$5(VerticaDistributedFilesystemWritePipe.scala:121)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$4(VerticaDistributedFilesystemWritePipe.scala:118)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$3(VerticaDistributedFilesystemWritePipe.scala:115)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$doPreWriteSteps$1(VerticaDistributedFilesystemWritePipe.scala:109)
at scala.util.Either.flatMap(Either.scala:341)
at com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.doPreWriteSteps(VerticaDistributedFilesystemWritePipe.scala:106)
at com.vertica.spark.datasource.core.DSWriteConfigSetup.performInitialSetup(DSConfigSetup.scala:748)
at com.vertica.spark.datasource.core.DSWriteConfigSetup.performInitialSetup(DSConfigSetup.scala:702)
at com.vertica.spark.datasource.v2.VerticaBatchWrite.(VerticaDatasourceV2Write.scala:70)
at com.vertica.spark.datasource.v2.VerticaWriteBuilder.buildForBatch(VerticaDatasourceV2Write.scala:51)
at org.apache.spark.sql.connector.write.WriteBuilder$1.toBatch(WriteBuilder.java:44)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
Which version of the spark connector are you running? I suspect it's a very new version, which is why isBlank() method is not there. String.isBlank() was added in Java 11 so you will need to update your JVM to JRE 11 or newer.
Alternate option: Java 8 backport has been added to GitHub but not the official release. If you build the spark-connector from GitHub source, that will run on Java 8.
@Bryan_H im using https://repo1.maven.org/maven2/com/vertica/spark/vertica-spark/3.3.1/
As im on Java 8 I tried to build jar from https://github.com/vertica/spark-connector/tree/main
But im getting same error
May be some issue how im building the connector
Can we get the latest release with this fix ? or where I can get the build witht this fix ?
I ran a build with "sbt assembly" using OpenJDK 8, scala-2.12 and it completed with passing tests. This build should work for you if you build on JDK 8 with same scala version as your Spark cluster. Did you copy the build artifact "target/scala-2.12/spark-vertica-connector-assembly-3.3.5.jar" into your Spark job in place of the Maven package? They have the same version number, so there may be a conflict if both packages are referenced.