Unable to write spark DF in Vertica using API

Prakhar84Prakhar84 Vertica Customer
edited July 2021 in General Discussion

Hi Team

Getting below error when trying to save spark DF in vertica,this is blocking my work ,any help will be really appreciated ,couple of things here
a) we have kereberosed cloudera and kerberosed vertica ,all necessary xml files are copied already in vertica clusters as part of setup
b)I can see entries in run table which is created in Vertica when i try to write DF in vertica-does that mean that hadoop vertica connection is established?

Below is the error spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)** 19/12/04 14:52:00 ERROR s2v.S2V: Failed to save DataFrame to Vertica table: est_vertica Traceback (most recent call last): File "", line 1, in File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/sql/readwriter.py", line 703, in save self._jwrite.save() File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o85.save.
java.lang.Exception: S2V: FATAL ERROR for job S2V_job2702546586973782764. Job status information is available in the Vertica table S2V_JOB_STATUS_USER. Unable to create/insert into target table FRR.test_vertica with SaveMode: Append. ERROR MESSAGE: ERROR: java.sql.SQLException: [Vertica][VJDBC](6776) ERROR: Failed to glob [hdfs:/x/y/tmp/vertica/S2V_job2702546586973782764/*.orc] because of error: Could not connect to [hdfs://x]
Tagged:
«1

Answers

  • SruthiASruthiA Vertica Employee Administrator

    Based on the error it looks like it could not connect to vertica using spark connector. What is the output of the following

    SELECT KERBEROS_CONFIG_CHECK();

    SELECT HDFS_CLUSTER_CONFIG_CHECK();

  • Prakhar84Prakhar84 Vertica Customer

    a)
    ok: krb5 exists at [/etc/krb5.conf]
    ok: Vertica Keytab file is set to [/opt/vertica/config/vertica2.kt]
    ok: Vertica Keytab file exists at [/opt/vertica/config/vertica2.kt]
    Kerberos configuration parameters set in the database
    KerberosServiceName : [vertica]
    KerberosHostname : [db.intlb.org.net]
    KerberosRealm : [QA.org.NET]
    KerberosKeytabFile : [/opt/vertica/config/vertica2.kt]
    Vertica Principal: [vertica/[email protected]]
    ok: Vertica can kinit using keytab file
    b) i dont have access to the same

  • SruthiASruthiA Vertica Employee Administrator

    we officially dont support kerberos authentication yet with Spark Connector yet. But I am trying to see if setting the kerberos JDBC parameters will help. Did you already set them in your connecting properties? the properties are present in the below link

    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/Security/Kerberos/ConfigureJDBCClientsonallPlaforms.htm

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    We are trying to acheive this
    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingToVerticaFromSpark.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____0

    Are you referring this as Spark Connector?

    when i start the pyspark i use the jars

    pyspark2 --jars /home/x/vertica-9.0.1_spark2.1_scala2.11.jar,/home/x/vertica-jdbc-9.2.0-0.jar

  • SruthiASruthiA Vertica Employee Administrator

    I am talking about JBDC Kerberos properties present in the link I shared earlier. You can set it like below in your props variable. I am not sure it will work.. Just a kind of workaround which might help.

    val props = Seq(
    "user" -> "user",
    "JAASConfigName" -> "verticajdbc",
    "KerberosHostName" -> " <kerberoshostname",
    "KerberosServiceName" -> ""

    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingToVerticaFromSpark.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____0

    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/Security/Kerberos/ConfigureJDBCClientsonallPlaforms.htm

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    Hi
    Can you let me know how do we run this in cloudera platform on pyspark shell etc...want to have a working example based on your suggestion which saves a spark DF in vertica using https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm
    A working example will help us enact the same at our end ,we are reading parquet files from HDFS,saving it in DF and then trying to write in vertica
    Another thing ,we are on vertica 9.1

  • SruthiASruthiA Vertica Employee Administrator

    First thing is officially we dont support it. regarding the example, it is already present in the link you shared. Add the kerberos JDBC parameters in the opts where you add your HDFS URL

    val opts: Map[String, String] = Map(
    "table" -> "VerticaTableName",
    "db" -> "VerticaDatabaseName",
    "user" -> "VerticaDatabaseUser",
    "password" -> "VerticaDatabasePassword",
    "host" -> "VerticaHostName",
    "hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
    "web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
    // "failed_rows_percent_tolerance"-> "0.00" // OPTIONAL (default val shown)
    // "dbschema" -> "public" // OPTIONAL (default val shown)
    // "port" -> "5433" // OPTIONAL (default val shown)
    // "strlen" -> "1024" // OPTIONAL (default val shown)
    // "fileformat" -> "orc" // OPTIONAL (default val shown)
    )

    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm

  • Prakhar84Prakhar84 Vertica Customer

    Sorry but not getting sequence of steps
    I do below steps when i login in cluster unix box
    a) kinit -kt /home/x/creds/y.keytab y
    b) pyspark2 --jars /home/user/vertica-9.0.1_spark2.1_scala2.11.jar,/home/user/vertica-jdbc-9.2.0-0.jar
    where shall I call this part and how do I use this in the script to save DF
    below is my URL
    url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:5433/db"

    opts={}
    opts['dbschema'] = 'SCHEMA'
    opts['table'] = 'test_vertica'
    opts['db']='db'
    opts['user']='user'
    opts['password']='pwd'
    opts['host']='Vertica-x-dev.bnymellon.net'
    opts['hdfs_url']='hdfs://hdfs/idtd/user'
    opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'

    how do I use this in above,apologies but if you can help with working detail ,we can try that

  • SruthiASruthiA Vertica Employee Administrator

    For your JDBC URL, append the kerberos properties as follows

    url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:5433/db?JAASConfigName=verticajdbc&KerberosHostName=&KerberosServiceName= "

    Another method is pass the same parameters to opts like below

    opts["KerberosServiceName"] =
    opts['kerberosHostName"] =
    opts["JAASConfigName"] =

  • Prakhar84Prakhar84 Vertica Customer

    Thanks but what will be value of these parameters?
    JAASConfigName=verticajdbc(is this default name?)
    &KerberosHostName=(is this vertica side hostname? or cloudera side?
    &KerberosServiceName= (is this vertica side hostname? or cloudera side?

  • SruthiASruthiA Vertica Employee Administrator

    You can find it from the output you had shared earlier. You can use the values from the below output or your DBA can help you.

    ok: krb5 exists at [/etc/krb5.conf]
    ok: Vertica Keytab file is set to [/opt/vertica/config/vertica2.kt]
    ok: Vertica Keytab file exists at [/opt/vertica/config/vertica2.kt]
    Kerberos configuration parameters set in the database
    KerberosServiceName : [vertica]
    KerberosHostname : [db.intlb.org.net]
    KerberosRealm : [QA.org.NET]
    KerberosKeytabFile : [/opt/vertica/config/vertica2.kt]
    Vertica Principal: [vertica/[email protected]]
    ok: Vertica can kinit using keytab file

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    Did same as below but still same error (hope below is correct format I used)
    url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica"
    How come table status table always gets updated always (does it mean HDFS and vertica connection is established?)
    S2V_JOB_STATUS_USER
    other thing is i dont even use this url1 in write command?

    opts={}
    opts['dbschema'] = 'SCHEMA'
    opts['table'] = 'test_vertica'
    opts['db']='db'
    opts['user']='user'
    opts['password']='pwd'
    opts['host']='Vertica-x-dev.bnymellon.net'
    opts['hdfs_url']='hdfs://hdfs/idtd/user'
    opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
    then comes
    save
    spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)

  • SruthiASruthiA Vertica Employee Administrator

    format is correct.. I was just trying that option as workaround. looks like it did not help. There is already a new feature request for this. Please open a support case regarding table status. we may need logs to review it further.

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    No my question is I dont sue this while writing the DF then how the changes suggested by you will be used?
    url1 = "jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica"
    Look at my opts parameters url1 is not part of the same when writing to vertica

  • SruthiASruthiA Vertica Employee Administrator

    I asked you to use these 3 parameters in URL so that JDBC Connection will get kerberized. spark connector internally uses JDBC. You dont need them if you dont want to use kerberos authentication with spark to vertica connector.

  • Prakhar84Prakhar84 Vertica Customer

    But url1 is not used/called anywhere it is just a variable,do we need to add this in HDFS URL?how is this called when i call below as url1 is not part of opt

    opts={}
    opts['dbschema'] = 'SCHEMA'
    opts['table'] = 'test_vertica'
    opts['db']='db'
    opts['user']='user'
    opts['password']='pwd'
    opts['host']='Vertica-x-dev.bnymellon.net'
    opts['hdfs_url']='hdfs://hdfs/idtd/user'
    opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
    then comes
    save
    spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)

  • SruthiASruthiA Vertica Employee Administrator

    I was not aware that your program does not use URL1. if URL1 is not used, then add it opts as mentioned earlier.

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    how do I add...even in https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm?tocpath=Integrating with Apache Spark|Saving an Apache Spark DataFrame to a Vertica Table|_____1
    It is not mentioned to give vertica connection url
    Do you mean below?

    opts={}
    opts['dbschema'] = 'SCHEMA'
    opts['table'] = 'test_vertica'
    opts['url']='jdbc:vertica://Vertica-x-dev.bnymellon.net:1111/db?JAASConfigName=verticajdbc&KerberosHostName=db.intlb.org.net&KerberosServiceName=vertica'
    opts['db']='db'
    opts['user']='user'
    opts['password']='pwd'
    opts['host']='Vertica-x-dev.bnymellon.net'
    opts['hdfs_url']='hdfs://hdfs/idtd/user'
    opts['web_hdfs_url']='webhdfs://hdfs/idtd/user'
    then comes
    save
    spark_DF.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)

  • Prakhar84Prakhar84 Vertica Customer
    Please suggest
  • LenoyJLenoyJ - Select Field - Employee
    edited December 2019

    It looks to me that Vertica can't access HDFS. Could you confirm Vertica can access HDFS by following the steps in this comment? If you don't have access to the functions, check with your DBA. Or another way to confirm is to check around with your team if they're either using External Tables on HDFS or using COPY FROM hdfs://
     
    All that being said, there are two ways I know of that you can use to write to Vertica from Spark:

    1. You can use Vertica's Spark Connector.
      • This is generally the preferred way and faster.
      • But it has a dependency on HDFS as a temporary location to land the data before it is written to Vertica. Vertica & Spark must be able to access HDFS
      • No support for Kerberos Authentication (yet) without hacky workarounds
         
    2. You can use the Vertica JDBC Driver
      • A little slower but no dependency on HDFS
      • You potentially may be able to define Kerberos params in the jdbc connection string.
      • May require writing a custom SQL Dialect (depending on what you want to do)
         

    Here are example Scala codes for both:

    ./bin/spark-shell --jars /opt/vertica/packages/SparkConnector/lib/vertica-spark2.1_scala2.11.jar,/opt/vertica/java/vertica-jdbc.jar --master spark://172.31.4.157:7077
    
    //#Begin Scala code
    
            import org.apache.spark.sql.types._
            import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
            import com.vertica.spark._
    
    //#Let's create a sample DataFrame for saving to Vertica
    
            val rows = sc.parallelize(Array(
              Row(1,"hello", true),
              Row(2,"goodbye", false)
            ))
    
            val schema = StructType(Array(
              StructField("id",IntegerType, false),
              StructField("message",StringType,true),
              StructField("still_here",BooleanType,true)
            ))
    
            val df = spark.createDataFrame(rows, schema)
    
    //#View the sample data
    
            df.show
    
    //#Option 1: Write data to Vertica using Spark Connector
    
            val host = "172.31.4.157"
            val db = "dbname"
            val user = "dbadmin"
            val password = ""
            val dbschema = "public"
            val table = "test_table"
            val hdfs_url = "hdfs://172.31.4.157:9000/vertica_output"
            val web_hdfs_url = "webhdfs://172.31.4.157:50070/vertica_output"
            val opt = Map("host" -> host, "dbschema" -> dbschema, "table" -> table, "db" -> db,  "user" -> user, "password" -> password, "hdfs_url" -> hdfs_url, "web_hdfs_url" -> web_hdfs_url)
            val mode = SaveMode.Append
    
            //#write using Spark Connector. This will only work if Vertica has access to HDFS.
            df.write.format("com.vertica.spark.datasource.DefaultSource").options(opt).mode(mode).save()
    
    //#Option 2: Write data to Vertica using JDBC
    
            //#Feel free to modify the connection string to include Kerberos Authentication and other params you require
            val url = "jdbc:vertica://172.31.4.157/dbname?username=dbadmin&password=&ConnectionLoadBalance=1&DirectBatchInsert=1"
    
            //#Only mode Append works without a custom Vertica sql dialect. Make sure table exists though...
            val mode = SaveMode.Append
    
            //# write using JDBC.
            //# Feel free to modify connection properties here too
            df.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url).option("dbtable", "public.test_table").option("numPartitions", 12).option("batchsize", 200000).mode(mode).save()
    
  • Prakhar84Prakhar84 Vertica Customer

    Thanks but JDBC,is that good way to load,heard it has issues with null values/null columns ,please suggest

  • Bryan_HBryan_H Vertica Employee Administrator

    JDBC works with several BI and ETL tools and is officially supported. I'm not aware of any issues with NULL values. Please let us know if there are issues with NULL values so we can fix them or suggest a workaround.

  • Prakhar84Prakhar84 Vertica Customer

    Below is the null issue I am getting,please provide a solution here for the same

    query is below

    "select col1,col2,col3 from ACCTIDT"

    then getting error when writing to vertica using jdbc

    ACCT_final.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url1).option("dbtable", "test_vertica").option("savemode","append").option("user", "xxxxx").option("password", "xxxxxx").mode("append").save()

    java.sql.SQLFeatureNotSupportedException: [Vertica]JDBC Driver not capable.
    at com.vertica.exceptions.ExceptionConverter.toSQLException(Unknown Source)
    at com.vertica.jdbc.common.SPreparedStatement.checkTypeSupported(Unknown Source)
    at com.vertica.jdbc.common.SPreparedStatement.setNull(Unknown Source)

    Change to query as below
    "select col1,coalesce(col2,'') as col2,col3 from ACCTIDT"
    then loads properly but i dont want it to be like this ,as i cannot make coalesce for all null columns ,we have 100 of columns in 1000 of tables
    table ddl
    CREATE TABLE test_vertica
    (
    col1 varchar(1024),
    col2 varchar(1024),
    col3 varchar(1024)
    )
    sample 2 rows when i do ACCT_final.take(2) in pyspark
    [Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')

  • Bryan_HBryan_H Vertica Employee Administrator

    Can you share more of the code, in particular the DF definition and any SQL queries? The following works in plain Java, but requires that I specify the NULL type as required by Oracle Java SE 8 docs:
    PreparedStatement pstmt = conn.prepareStatement("INSERT INTO test_vertica VALUES (?,?,?);");
    pstmt.setNull(1, Types.VARCHAR);
    pstmt.setString(2, "New");
    pstmt.setString(3, "York");
    int affectedRows = pstmt.executeUpdate();

  • Prakhar84Prakhar84 Vertica Customer

    Below is the same

    out_file="/x/y/z/bus_date=20190430/source=abc/"
    ACCT_df = spark.read.format("csv").option("header", "true").load(out_file)
    ACCT_df.createOrReplaceTempView("ACCT")
    query="select col1,col2,col3 from ACCT"
    ACCT_final=scSpark.sql(query)
    ACCT_final.printSchema()
    root
    |-- col1: string (nullable = true)
    |-- col2: string (nullable = false)
    |-- col3: string (nullable = true)
    sample 2 rows when i do ACCT_final.take(2) in pyspark
    [Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')

  • Bryan_HBryan_H Vertica Employee Administrator

    I am confused now: ACCT_final schema says col2: string (nullable = false) so I would not expect NULL to work. However, the exception should be "[Vertica]JDBC Parameter 2 is not nullable." Is the table ACCT defined with NOT NULL on col2?
    Also, what version of Vertica JDBC driver do you have? I am running my test with vertica-jdbc-9.3.0-1.jar

  • Prakhar84Prakhar84 Vertica Customer

    i guess col2 value is " ,if you see below
    [Row(col1=u'ABC', col2=u'', col3=u'123456789'), Row(col1=u'ABC', col2=u'', col3=u'12345678')
    target vertica table DDL.all columns are nullable =true in vertica DB
    CREATE TABLE test_vertica
    (
    col1 varchar(1024),
    col2 varchar(1024),
    col3 varchar(1024)
    )
    driver is :vertica-jdbc-9.2.0-0.jar

  • Bryan_HBryan_H Vertica Employee Administrator

    As LenoyJ suggested, we'll try a custom dialect. Vertica is almost verbatim Postgres, but Spark doesn't know that. So try adding the attached VerticaDialect.scala class to your project (remove .txt at end!), and register the dialect as follows:
    JdbcDialects.registerDialect(VerticaDialect)
    val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new
    Properties()
    // your code here
    ......
    JdbcDialects.unregisterDialect(VerticaDialect)

  • Prakhar84Prakhar84 Vertica Customer
    edited December 2019

    I am just starting with pyspark so couple of questions
    a) Where should I save this file
    b) Will this work in pyspark as well ,I have to use pyspark for coding this
    c) How will below code change if pyspark (also i see after new below there is no closing brackets etc)
    val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new
    Properties()
    Please suggest my complete code is below :
    pyspark2 --jars /home/x/vertica-9.0.1_spark2.1_scala2.11.jar,/home/x/vertica-jdbc-9.2.0-0.jar
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField
    from pyspark.sql.types import DoubleType, IntegerType, StringType,DateType
    from pyspark.sql import functions
    from pyspark.sql import HiveContext
    scSpark = SparkSession.builder.appName("A").config("spark.sql.warehouse.dir","hdfs:///user/hive/warehouse").enableHiveSupport().getOrCreate()
    out_file="/x/y/z/bus_date=20190430/source=abc/"
    ACCT_df = spark.read.format("csv").option("header", "true").load(out_file)
    ACCT_df.createOrReplaceTempView("ACCT")
    query="select col1,col2,col3 from ACCT"
    ACCT_final=scSpark.sql(query)
    ACCT_final.printSchema()
    url1 = "jdbc:vertica://vertica-dw-dev.net/db"
    ACCT_final.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url1).option("dbtable", "test_vertica").option("savemode","append").option("user", "xxxxx").option("password", "xxxxxxx").mode("append").save()

  • LenoyJLenoyJ - Select Field - Employee
    edited December 2019

    @Bryan_H's comment started me on a path to modify Postgres's Spark Dialect for Vertica. A few customers requested it before so now's a good time as ever. For one, Vertica does not have a TEXT data type - it's all VARCHAR. And there are no ARRAY types in Vertica (yet). I haven't added Vertica specific data types (like UUID) - probably scope for the future.
    @Prakhar84 , I did the below with Scala. You should be able to extrapolate to pyspark (google how to import Spark JDBC dialects)

    1. I have a CSV file as follows with nulls:

      $ cat out.csv
      col1,col2,col3
      ABC,,123
      DEF,,456
      
    2. Start Spark:

      ./bin/spark-shell --jars /opt/vertica/packages/SparkConnector/lib/vertica-spark2.1_scala2.11.jar,/opt/vertica/java/vertica-jdbc.jar,/home/dbadmin/VerticaDialect.scala --master spark://172.31.4.157:7077
      
    3. Import the following Vertica Dialect and Register it. Modified Postgres's dialect from here.

      //#Begin Scala code
      import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcDialect, JdbcType}
      import java.sql.{Connection, Types}
      import java.util.Locale
      
      import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
      import org.apache.spark.sql.types._
      
      
      val VerticaDialect = new JdbcDialect { 
      
        val MAX_PRECISION = 38
        val MAX_SCALE = 38
      
        import scala.math.min
      
        def bounded(precision: Int, scale: Int): DecimalType = {
            DecimalType(min(precision, MAX_PRECISION), min(scale, MAX_SCALE))
        }
      
        override def canHandle(url: String): Boolean =
          url.toLowerCase(Locale.ROOT).startsWith("jdbc:vertica")
      
        override def getCatalystType(
            sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
          if (sqlType == Types.REAL) {
            Some(FloatType)
          } else if (sqlType == Types.SMALLINT) {
            Some(ShortType)
          } else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
            Some(BinaryType)
          } else if (sqlType == Types.OTHER) {
            Some(StringType)
          } 
          else None
        }
      
        private def toCatalystType(
            typeName: String,
            precision: Int,
            scale: Int): Option[DataType] = typeName match {
          case "bool" => Some(BooleanType)
          case "bit" => Some(BinaryType)
          case "int2" => Some(ShortType)
          case "int4" => Some(IntegerType)
          case "int8" | "oid" => Some(LongType)
          case "float4" => Some(FloatType)
          case "money" | "float8" => Some(DoubleType)
          case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
            Some(StringType)
          case "bytea" => Some(BinaryType)
          case "timestamp" | "timestamptz" | "time" | "timetz" | "datetime" | "smalldatetime" => Some(TimestampType)
          case "date" => Some(DateType)
          case "numeric" | "decimal" if precision > 0 => Some(bounded(precision, scale))
          case "numeric" | "decimal" => Some(DecimalType. SYSTEM_DEFAULT)
          case _ => None
        }
      
        override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
          case StringType => Some(JdbcType("VARCHAR", Types.CHAR))
          case BinaryType => Some(JdbcType("BYTEA", Types.BINARY))
          case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
          case FloatType => Some(JdbcType("FLOAT", Types.FLOAT))
          case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
          case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
          case t: DecimalType => Some(
            JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
          case _ => None
        }
      
        override def getTableExistsQuery(table: String): String = {
          s"SELECT 1 FROM $table LIMIT 1"
        }
      
        override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
      
        override def getTruncateQuery(
            table: String,
            cascade: Option[Boolean] = isCascadingTruncateTable): String = {
          cascade match {
            case Some(true) => s"TRUNCATE TABLE $table"
            case _ => s"TRUNCATE TABLE $table"
          }
        }
      
        override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
          super.beforeFetch(connection, properties)
      
          if (properties.getOrElse(JDBCOptions.JDBC_BATCH_FETCH_SIZE, "0").toInt > 0) {
            connection.setAutoCommit(false)
          }
        }
      
      }
      
      JdbcDialects.registerDialect(VerticaDialect)
      
    4. Read the CSV File & Show:

      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
      import com.vertica.spark._
      import org.apache.spark.sql.SQLContext
      
      val sqlContext = new SQLContext(sc)
      
      val out_file="/home/dbadmin/out.csv"
      val df = sqlContext.read.format("csv").option("header", "true").load(out_file)
      
      df.show
      //#+----+----+----+
      //#|col1|col2|col3|
      //#+----+----+----+
      //#| ABC|null| 123|
      //#| DEF|null| 456|
      //#+----+----+----+
      
    5. Create a JDBC connection string and write to Vertica

      val url = "jdbc:vertica://172.31.4.157/dbname?username=dbadmin&password=&ConnectionLoadBalance=1&DirectBatchInsert=1"
      
      //#Using Append Mode. Make sure table exists with the right schema in Vertica...
      val mode = SaveMode.Append
      
      //#Write using JDBC.
      df.write.format("jdbc").option("driver", "com.vertica.jdbc.Driver").option("url", url).option("dbtable", "public.test_vertica").mode(mode).save()
      
    6. Notice that you don't get the "Driver not capable." error anymore as we've used the above Vertica Dialect and Nulls are handled properly.

    7. Switch over to vsql and check the table if Nulls have transferred:

      dbadmin=> select * from test_vertica;
       col1 | col2 | col3
      ------+------+------
       ABC  |      | 123
       DEF  |      | 456
      (2 rows)
      
      dbadmin=> select col1, col2, coalesce(col2,'yes it is null') as Is_Col2_Null, col3 from test_vertica;
       col1 | col2 |  Is_Col2_Null  | col3
      ------+------+----------------+------
       ABC  |      | yes it is null | 123
       DEF  |      | yes it is null | 456
      (2 rows)
      

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file