Unable to write spark DF in Vertica using API
Prakhar84
Vertica Customer ✭
Hi Team
Getting below error when trying to save spark DF in vertica,this is blocking my work ,any help will be really appreciated ,couple of things here
a) we have kereberosed cloudera and kerberosed vertica ,all necessary xml files are copied already in vertica clusters as part of setup
b)I can see entries in run table which is created in Vertica when i try to write DF in vertica-does that mean that hadoop vertica connection is established?
-
Below is the error
spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)**
19/12/04 14:52:00 ERROR s2v.S2V: Failed to save DataFrame to Vertica table: est_vertica
Traceback (most recent call last):
File "", line 1, in
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/sql/readwriter.py", line 703, in save
self._jwrite.save()
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o85.save.
- java.lang.Exception: S2V: FATAL ERROR for job S2V_job2702546586973782764. Job status information is available in the Vertica table S2V_JOB_STATUS_USER. Unable to create/insert into target table FRR.test_vertica with SaveMode: Append. ERROR MESSAGE: ERROR: java.sql.SQLException: [Vertica][VJDBC](6776) ERROR: Failed to glob [hdfs:/x/y/tmp/vertica/S2V_job2702546586973782764/*.orc] because of error: Could not connect to [hdfs://x]
Tagged:
0
Answers
Based on the error it looks like it could not connect to vertica using spark connector. What is the output of the following
SELECT KERBEROS_CONFIG_CHECK();
SELECT HDFS_CLUSTER_CONFIG_CHECK();
a)
ok: krb5 exists at [/etc/krb5.conf]
ok: Vertica Keytab file is set to [/opt/vertica/config/vertica2.kt]
ok: Vertica Keytab file exists at [/opt/vertica/config/vertica2.kt]
Kerberos configuration parameters set in the database
KerberosServiceName : [vertica]
KerberosHostname : [db.intlb.org.net]
KerberosRealm : [QA.org.NET]
KerberosKeytabFile : [/opt/vertica/config/vertica2.kt]
Vertica Principal: [vertica/db.intlb.org.net@QA.org.NET]
ok: Vertica can kinit using keytab file
b) i dont have access to the same
we officially dont support kerberos authentication yet with Spark Connector yet. But I am trying to see if setting the kerberos JDBC parameters will help. Did you already set them in your connecting properties? the properties are present in the below link
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/Security/Kerberos/ConfigureJDBCClientsonallPlaforms.htm
We are trying to acheive this
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingToVerticaFromSpark.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____0
Are you referring this as Spark Connector?
when i start the pyspark i use the jars
pyspark2 --jars /home/x/vertica-9.0.1_spark2.1_scala2.11.jar,/home/x/vertica-jdbc-9.2.0-0.jar
I am talking about JBDC Kerberos properties present in the link I shared earlier. You can set it like below in your props variable. I am not sure it will work.. Just a kind of workaround which might help.
val props = Seq(
"user" -> "user",
"JAASConfigName" -> "verticajdbc",
"KerberosHostName" -> " <kerberoshostname",
"KerberosServiceName" -> ""
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingToVerticaFromSpark.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____0
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/Security/Kerberos/ConfigureJDBCClientsonallPlaforms.htm
Hi
Can you let me know how do we run this in cloudera platform on pyspark shell etc...want to have a working example based on your suggestion which saves a spark DF in vertica using https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm
A working example will help us enact the same at our end ,we are reading parquet files from HDFS,saving it in DF and then trying to write in vertica
Another thing ,we are on vertica 9.1
First thing is officially we dont support it. regarding the example, it is already present in the link you shared. Add the kerberos JDBC parameters in the opts where you add your HDFS URL
val opts: Map[String, String] = Map(
"table" -> "VerticaTableName",
"db" -> "VerticaDatabaseName",
"user" -> "VerticaDatabaseUser",
"password" -> "VerticaDatabasePassword",
"host" -> "VerticaHostName",
"hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
"web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
// "failed_rows_percent_tolerance"-> "0.00" // OPTIONAL (default val shown)
// "dbschema" -> "public" // OPTIONAL (default val shown)
// "port" -> "5433" // OPTIONAL (default val shown)
// "strlen" -> "1024" // OPTIONAL (default val shown)
// "fileformat" -> "orc" // OPTIONAL (default val shown)
)
https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm
Sorry but not getting sequence of steps
I do below steps when i login in cluster unix box
a) kinit -kt /home/x/creds/y.keytab y
b) pyspark2 --jars /home/user/vertica-9.0.1_spark2.1_scala2.11.jar,/home/user/vertica-jdbc-9.2.0-0.jar
where shall I call this part and how do I use this in the script to save DF
below is my URL
url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:5433/db"
opts={}
opts['dbschema'] = 'SCHEMA'
opts['table'] = 'test_vertica'
opts['db']='db'
opts['user']='user'
opts['password']='pwd'
opts['host']='Vertica-x-dev.bnymellon.net'
opts['hdfs_url']='hdfs://hdfs/idtd/user'
opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
how do I use this in above,apologies but if you can help with working detail ,we can try that
For your JDBC URL, append the kerberos properties as follows
url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:5433/db?JAASConfigName=verticajdbc&KerberosHostName=&KerberosServiceName= "
Another method is pass the same parameters to opts like below
opts["KerberosServiceName"] =
opts['kerberosHostName"] =
opts["JAASConfigName"] =
Thanks but what will be value of these parameters?
JAASConfigName=verticajdbc(is this default name?)
&KerberosHostName=(is this vertica side hostname? or cloudera side?
&KerberosServiceName= (is this vertica side hostname? or cloudera side?
You can find it from the output you had shared earlier. You can use the values from the below output or your DBA can help you.
ok: krb5 exists at [/etc/krb5.conf]
ok: Vertica Keytab file is set to [/opt/vertica/config/vertica2.kt]
ok: Vertica Keytab file exists at [/opt/vertica/config/vertica2.kt]
Kerberos configuration parameters set in the database
KerberosServiceName : [vertica]
KerberosHostname : [db.intlb.org.net]
KerberosRealm : [QA.org.NET]
KerberosKeytabFile : [/opt/vertica/config/vertica2.kt]
Vertica Principal: [vertica/db.intlb.org.net@QA.org.NET]
ok: Vertica can kinit using keytab file
Did same as below but still same error (hope below is correct format I used)
url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica"
How come table status table always gets updated always (does it mean HDFS and vertica connection is established?)
S2V_JOB_STATUS_USER
other thing is i dont even use this url1 in write command?
opts={}
opts['dbschema'] = 'SCHEMA'
opts['table'] = 'test_vertica'
opts['db']='db'
opts['user']='user'
opts['password']='pwd'
opts['host']='Vertica-x-dev.bnymellon.net'
opts['hdfs_url']='hdfs://hdfs/idtd/user'
opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
then comes
save
spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)
format is correct.. I was just trying that option as workaround. looks like it did not help. There is already a new feature request for this. Please open a support case regarding table status. we may need logs to review it further.
No my question is I dont sue this while writing the DF then how the changes suggested by you will be used?
url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica"
Look at my opts parameters url1 is not part of the same when writing to vertica
I asked you to use these 3 parameters in URL so that JDBC Connection will get kerberized. spark connector internally uses JDBC. You dont need them if you dont want to use kerberos authentication with spark to vertica connector.
But url1 is not used/called anywhere it is just a variable,do we need to add this in HDFS URL?how is this called when i call below as url1 is not part of opt
opts={}
opts['dbschema'] = 'SCHEMA'
opts['table'] = 'test_vertica'
opts['db']='db'
opts['user']='user'
opts['password']='pwd'
opts['host']='Vertica-x-dev.bnymellon.net'
opts['hdfs_url']='hdfs://hdfs/idtd/user'
opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
then comes
save
spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)
I was not aware that your program does not use URL1. if URL1 is not used, then add it opts as mentioned earlier.
how do I add...even in https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____1
It is not mentioned to give vertica connection url
Do you mean below?
opts={}
opts['dbschema'] = 'SCHEMA'
opts['table'] = 'test_vertica'
opts['url']='jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica'
opts['db']='db'
opts['user']='user'
opts['password']='pwd'
opts['host']='Vertica-x-dev.bnymellon.net'
opts['hdfs_url']='hdfs://hdfs/idtd/user'
opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
then comes
save
spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)
It looks to me that Vertica can't access HDFS. Could you confirm Vertica can access HDFS by following the steps in this comment? If you don't have access to the functions, check with your DBA. Or another way to confirm is to check around with your team if they're either using External Tables on HDFS or using COPY FROM hdfs://
All that being said, there are two ways I know of that you can use to write to Vertica from Spark:
Here are example Scala codes for both:
Thanks but JDBC,is that good way to load,heard it has issues with null values/null columns ,please suggest
JDBC works with several BI and ETL tools and is officially supported. I'm not aware of any issues with NULL values. Please let us know if there are issues with NULL values so we can fix them or suggest a workaround.
Below is the null issue I am getting,please provide a solution here for the same
query is below
"select col1,col2,col3 from ACCTIDT"
then getting error when writing to vertica using jdbc
ACCT_final.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url1).option("dbtable", "test_vertica").option("savemode","append").option("user", "xxxxx").option("password", "xxxxxx").mode("append").save()
java.sql.SQLFeatureNotSupportedException: [Vertica]JDBC Driver not capable.
at com.vertica.exceptions.ExceptionConverter.toSQLException(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
at com.vertica.jdbc.common.SPreparedStatement.setNull(Unknown Source)
Change to query as below
"select col1,coalesce(col2,'') as col2,col3 from ACCTIDT"
then loads properly but i dont want it to be like this ,as i cannot make coalesce for all null columns ,we have 100 of columns in 1000 of tables
table ddl
CREATE TABLE test_vertica
(
col1 varchar(1024),
col2 varchar(1024),
col3 varchar(1024)
)
sample 2 rows when i do ACCT_final.take(2) in pyspark
[Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')
Can you share more of the code, in particular the DF definition and any SQL queries? The following works in plain Java, but requires that I specify the NULL type as required by Oracle Java SE 8 docs:
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO test_vertica VALUES (?,?,?);");
pstmt.setNull(1, Types.VARCHAR);
pstmt.setString(2, "New");
pstmt.setString(3, "York");
int affectedRows = pstmt.executeUpdate();
Below is the same
out_file="/x/y/z/bus_date=20190430/source=abc/"
ACCT_df = spark.read.format("csv").option("header", "true").load(out_file)
ACCT_df.createOrReplaceTempView("ACCT")
query="select col1,col2,col3 from ACCT"
ACCT_final=scSpark.sql(query)
ACCT_final.printSchema()
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = false)
|-- col3: string (nullable = true)
sample 2 rows when i do ACCT_final.take(2) in pyspark
[Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')
I am confused now: ACCT_final schema says col2: string (nullable = false) so I would not expect NULL to work. However, the exception should be "[Vertica]JDBC Parameter 2 is not nullable." Is the table ACCT defined with NOT NULL on col2?
Also, what version of Vertica JDBC driver do you have? I am running my test with vertica-jdbc-9.3.0-1.jar
i guess col2 value is " ,if you see below
[Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')
target vertica table DDL.all columns are nullable =true in vertica DB
CREATE TABLE test_vertica
(
col1 varchar(1024),
col2 varchar(1024),
col3 varchar(1024)
)
driver is :vertica-jdbc-9.2.0-0.jar
As LenoyJ suggested, we'll try a custom dialect. Vertica is almost verbatim Postgres, but Spark doesn't know that. So try adding the attached VerticaDialect.scala class to your project (remove .txt at end!), and register the dialect as follows:
JdbcDialects.registerDialect(VerticaDialect)
val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new
Properties()
// your code here
......
JdbcDialects.unregisterDialect(VerticaDialect)
I am just starting with pyspark so couple of questions
a) Where should I save this file
b) Will this work in pyspark as well ,I have to use pyspark for coding this
c) How will below code change if pyspark (also i see after new below there is no closing brackets etc)
val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new
Properties()
Please suggest my complete code is below :
pyspark2 --jars /home/x/vertica-9.0.1_spark2.1_scala2.11.jar,/home/x/vertica-jdbc-9.2.0-0.jar
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType,DateType
from pyspark.sql import functions
from pyspark.sql import HiveContext
scSpark = SparkSession.builder.appName("A").config("spark.sql.warehouse.dir","hdfs:///user/hive/warehouse").enableHiveSupport().getOrCreate()
out_file="/x/y/z/bus_date=20190430/source=abc/"
ACCT_df = spark.read.format("csv").option("header", "true").load(out_file)
ACCT_df.createOrReplaceTempView("ACCT")
query="select col1,col2,col3 from ACCT"
ACCT_final=scSpark.sql(query)
ACCT_final.printSchema()
url1 = "jdbc:vertica://vertica-dw-dev.net/db"
ACCT_final.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url1).option("dbtable", "test_vertica").option("savemode","append").option("user", "xxxxx").option("password", "xxxxxxx").mode("append").save()
@Bryan_H's comment started me on a path to modify Postgres's Spark Dialect for Vertica. A few customers requested it before so now's a good time as ever. For one, Vertica does not have a TEXT data type - it's all VARCHAR. And there are no ARRAY types in Vertica (yet). I haven't added Vertica specific data types (like UUID) - probably scope for the future.
@Prakhar84 , I did the below with Scala. You should be able to extrapolate to pyspark (google how to import Spark JDBC dialects)
I have a CSV file as follows with nulls:
Start Spark:
Import the following Vertica Dialect and Register it. Modified Postgres's dialect from here.
Read the CSV File & Show:
Create a JDBC connection string and write to Vertica
Notice that you don't get the "Driver not capable." error anymore as we've used the above Vertica Dialect and Nulls are handled properly.
Switch over to vsql and check the table if Nulls have transferred: