Facing issue in Spark + HDFS to load data into Vertica tables.

NishantDNishantD Community Edition User

Hello,
We use Spark + HDFS to load data into Vertica tables. Here tables are auto generated using HDFS connector provided by Vertica. We have seen the auto generated tables are not optimized - It has Float data type, a fixed varchar length (we provide max 5000) and an un-optimized projection, which is created using default rules of Vertica.
Our need is
Define data types using HDFS Connector smartly (i.e. Do not use float and do not give default width to string columns)
how to control the data types dynamically using the same.

Please help me ...

Answers

  • Bryan_HBryan_H Vertica Employee Administrator

    Are you able to integrate the Vertica driver into your Spark job using JDBC driver for Java/Scala or vertica-python for PySpark?
    If so, you can establish a SQL connection and create the SQL table with expected data types, field lengths, and optimized projection by sending a CREATE TABLE statement before loading, then use SaveMode.Append option to load the data.

  • LenoyJLenoyJ - Select Field - Employee
    edited May 2020

    Look into target_table_ddl parameter in this doc page.
    You should be able to define tables and projections before you write into Vertica.

  • NishantDNishantD Community Edition User

    Thanks Bryan_H and LenoyJ,
    Yes i am using this for reff
    https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/SparkConnector/WritingtoVerticaUsingDefaultSource.htm#Intermed
    but problem is that the target_table_ddl property not working , if automatically create schema and do not use my given schema.I have tried all save options but it is not working.

  • LenoyJLenoyJ - Select Field - Employee

    Could you put up what all you have tried (with code, if possible)?
    Also, go through this comment of mine (and the thread) for examples of connecting & writing to Vertica. I didn't use the target_table_ddl property but you can see if everything else looks similar to what you have.

  • NishantDNishantD Community Edition User

    @LenoyJ said:
    Could you put up what all you have tried (with code, if possible)?
    Also, go through this comment of mine (and the thread) for examples of connecting & writing to Vertica. I didn't use the target_table_ddl property but you can see if everything else looks similar to what you have.

    @LenoyJ said:
    Could you put up what all you have tried (with code, if possible)?
    Also, go through this comment of mine (and the thread) for examples of connecting & writing to Vertica. I didn't use the target_table_ddl property but you can see if everything else looks similar to what you have.

    val spark = SparkSession.builder().appName("Spark").config(conf).getOrCreate()
    val df = spark.read
    .format("csv")
    .option("delimeter", ",")
    .option("quote", "\"")
    .option("escape", "\")
    .option("charToEscapeQuoteEscaping", "\0")
    .option("comment", "")
    .option("header", true)
    .option("inferSchema", true)
    .option("ignoreLeadingWhiteSpace", true)
    .option("ignoreTrailingWhiteSpace", true)
    .option("nullValue", ",")
    .option("nanValue", "NaN")
    .option("positiveInf", "Inf")
    .option("negativeInf", "-Inf")
    .option("dateFormat", "yyyy-MM-dd")
    .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
    .option("maxColumns", "20480")
    .option("maxCharsPerColumn", "-1")
    .option("mode", "PERMISSIVE")
    .option("multiLine", true)
    .load("----File--------")
    val query =
    """CREATE TABLE employee (
    col1 VARCHAR(20),
    col2 BIGINT NOT NULL,
    col3 VARCHAR(10),
    col4 BIGINT NOT NULL,
    col5 INTEGER NOT NULL,
    col6 VARCHAR(20),
    col7 numeric(18,4))"""

    println(query)
    val verticaProperties: Map[String, String] = Map(
      "db" -> "cc_dev_qa_vertica01", // Database name
      "user" -> "xxxxxxxx", // Database username
      "password" -> "xxxxxx", // Password
      "dbschema" -> "xxxxxxxx", // schema of vertica where the table will be residing
      "host" -> "xxxxxx", // Host on which vertica is currently running,
      "hdfs_url" -> "hdfs://10.90.21.55:xxxx", // HDFS directory url in which intermediate orc file will persist before sending it to vertica
      "web_hdfs_url" -> "webhdfs://10.90.21.55:50070/karam/",
      "table" -> "employee",
      "target_table_ddl" -> query
    )
    
    val mode = SaveMode.Append
    df.write.format("com.vertica.spark.datasource.DefaultSource").options(verticaProperties).mode(mode).save()
    

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file