Spark DataFrame save fails to insert

I'm following the instructions in the documentation with the latest spark connector and it fails to save with the error  below.  I've confirmed that the target table is created and correct. I also get a bunch of temp tables associated with the partitions.  Not much else to go on for troubleshooting.  What's broken?

 

Caused by: java.lang.Exception: Partition[8]: FATAL ERROR for job S2V_job747153122453945571. Job status information is available in the Vertica table ericf.S2V_JOB_STATUS.  . Failed rows summary: FailedRowsPercent=0.0; failedRowsPercentTolerance=0.0: PASSED. OK to commit rows to database. . Unable to create/insert into target table ericf.testtable

Comments

  • Hi Eric,

     

    Can you check what are the all_done and success values from ericf.S2V_JOB_STATUS table please?  For your job S2V_job747153122453945571. Are you using append mode or overwrite mode?  The temp tables are normally deleted upon exit, not sure why they remain.   Also did you install the "S2VAvroParser" into Vertica?

     

    Thank you,

    Jeff

  • Hi Jeff,

     

    => select save_mode, all_done, success from S2V_JOB_STATUS ; save_mode | all_done | success

    -----------+----------+---------

    Overwrite | t | f (1 row)

     

    => \df S2VAvroParser

                             List of functions

     procedure_name | procedure_return_type | procedure_argument_types 

    ----------------+-----------------------+--------------------------

     S2VAvroParser  |                       | 

    (1 row)

     

  • Thanks Eric.  

     

    Hmm, looks ok.  Can you please look at the the temp (staging) table to see if it has all of your data?  It will be something like:

    S2V_TEMP_TABLE_testtable_S2V_job747153122453945571

     

    If you are in Overwrite mode, it will first load into that table, then rename it to the real table when all done loading.

  • => select * from S2V_TEMP_TABLE_testtable_S2V_job8598052515447324877;

     address | house_value | name | tags 

    ---------+-------------+------+------

    (0 rows)

     

     

  • (I have several iterations of this, all the same, hence the varying job numbers).

     

    There is a single row in the DataFrame, as in the example.  ANd across the 16 partitions, I have this:

     

    => select * from S2V_TEMP_TABLE_testtable_S2V_job2040156621985483963_commits_partition15;

     partition_number | committed_row_count | total_rows 

    ------------------+---------------------+------------

                   15 |                   1 |          1

    (1 row)

     

  • And yet (same job)...

     

    => select * from S2V_TEMP_TABLE_testtable_S2V_job4660568210694068963;

     address | house_value | name | tags 

    ---------+-------------+------+------

    (0 rows)

  • Ok, thank you.  So it seems the job finished but failed to successfully oad any rows to the staging table.  I am not sure what this error is, sorry for the inconvenience.  One more request, can you please try with "numPartitions" -> "1", and upon error, see if there is any other earlier message before the failure message at the end.  

  •  

    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

    import com.vertica.spark._

     

    val json_string: String = """{"tags": ["home", "green"], "name" : "Yin", "address" : {"city": "Columbus", "stage":"Ohio"},

    "house_value": 12345.6789}"""

     

    val jsonRDD = sc.parallelize(json_string :: Nil)

    val df = sqlContext.read.json(jsonRDD)

    val schema = df.schema

     

    println("df.schema" + df.schema.toString)

    println("df.first=" + df.first)

     

    val dbpass = "xxx"

     

    val options2: Map[String,String] = Map(

      "table" -> "testtable",

      "db" -> "ericf",

      "user" -> "ericf",

      "password" -> dbpass,

      "host" -> "<host>",

      "dbschema" -> "ericf",

      "failed_rows_percent_tolerance" -> "0.00", // default

      "numPartitions" -> "1" // default

    )

     

    val mode = SaveMode.Overwrite

     

    df.write.format("com.vertica.spark.datasource.DefaultSource").options(options2).mode(mode).save

  •  

    scala> df.write.format("com.vertica.spark.datasource.DefaultSource").options(options2).mode(mode).save

    16/04/29 18:06:08 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 210, <host>): java.lang.Exception: Partition[0]: FATAL ERROR for job S2V_job7127636353534916391. Job status information is available in the Vertica table ericf.S2V_JOB_STATUS.  . Failed rows summary: FailedRowsPercent=0.0; failedRowsPercentTolerance=0.0: PASSED. OK to commit rows to database. . Unable to create/insert into target table ericf.testtable

            at com.vertica.spark.s2v.S2V.tryTofinalizeSaveToVertica(S2V.scala:746)

            at com.vertica.spark.s2v.S2V$$anonfun$2.apply(S2V.scala:226)

            at com.vertica.spark.s2v.S2V$$anonfun$2.apply(S2V.scala:128)

            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$22.apply(RDD.scala:745)

            at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$22.apply(RDD.scala:745)

            at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

            at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

            at org.apache.spark.scheduler.Task.run(Task.scala:89)

            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

            at java.lang.Thread.run(Thread.java:745)

  • Hi Eric,

     

     

    Sorry for the lack of failure info, we will add further debugging options.  One last request to try the following and re-try:

     

    GRANT EXECUTE ON PARSER public.S2VAvroParser() TO PUBLIC;

    GRANT EXECUTE ON PARSER public.S2VAvroParser() TO ericf;  <= or your username here

     

    Thank you

  • Hi Jeff

     

    GRANT EXECUTE ON PARSER public.S2VAvroParser() TO ericf;

     

    cleared out all tables and restarted spark just to be sure:

     

     

    scala> df.write.format("com.vertica.spark.datasource.DefaultSource").options(options2).mode(mode).save

    16/05/01 08:31:31 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 210, <host>): java.lang.Exception: Partition[0]: FATAL ERROR for job S2V_job4311359837345699676. Job status information is available in the Vertica table ericf.S2V_JOB_STATUS.  . Failed rows summary: FailedRowsPercent=0.0; failedRowsPercentTolerance=0.0: PASSED. OK to commit rows to database. . Unable to create/insert into target table ericf.testtable

     

    [...]

     

    Caused by: java.lang.Exception: Partition[0]: FATAL ERROR for job S2V_job4311359837345699676. Job status information is available in the Vertica table ericf.S2V_JOB_STATUS.  . Failed rows summary: FailedRowsPercent=0.0; failedRowsPercentTolerance=0.0: PASSED. OK to commit rows to database. . Unable to create/insert into target table ericf.testtable

            at com.vertica.spark.s2v.S2V.tryTofinalizeSaveToVertica(S2V.scala:746)

     

    => \dt S2V*

                                                                                                                            List of tables

     Schema |                                  Name                                  | Kind  | Owner |                                                                          Comment                                                                           

    --------+------------------------------------------------------------------------+-------+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------

     ERICF  | S2V_JOB_STATUS                                                         | table | ERICF | Persistent job status table showing all jobs, serving as permanent record of data loaded from Spark to Vertica. Creation time:Sun May 01 08:31:27 PDT 2016

     ERICF  | S2V_TEMP_TABLE_testtable_S2V_job4311359837345699676_commits_partition0 | table | ERICF | Spark data loader temp table, can be dropped after load completes. Start Time:Sun May 01 08:31:27 PDT 2016

     ERICF  | S2V_TEMP_TABLE_testtable_S2V_job4311359837345699676_last_committer     | table | ERICF | Spark data loader temp table, can be dropped after load completes. Start Time:Sun May 01 08:31:26 PDT 2016

    (3 rows)

     

     

    => \dt testtable

                    List of tables

     Schema |   Name    | Kind  | Owner | Comment 

    --------+-----------+-------+-------+---------

     ERICF  | testtable | table | ERICF | 

    (1 row)

     

    => select * from S2V_TEMP_TABLE_testtable_S2V_job4311359837345699676_commits_partition0;

     partition_number | committed_row_count | total_rows 

    ------------------+---------------------+------------

                    0 |                   1 |          1

    (1 row)

     

     

     

  • Hi Eric,

    I used the code that you pasted above, modified it slightly for my lab settings, and tested it. It is working perfectly fine, and inserts data into Vertica.

    Then I created a test user called sparky, and created a test schema called sparky_schema. When I tried to run the same code, for this user and schema it failed -- I had to grant the sparky user permissions on the functions, to make it work:

    GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO sparky;

    After this, the create and insert went through fine. Can you try the same for the "ericf" user?

    Also, have you tried the same piece of code, for a dbadmin user?

    Regards,
    Harshad

  • Hi Eric,

     

    I did some tests in the lab, and also tried using the code that you had pasted. It worked fine here and I was able to get the data from Spark to Vertica.

     

    1. I then created a test user called sparky and test schema called sparky_schema, and tried it -- it failed. I had to do the following to make it work:

     

    GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO sparky;

     

    Can you give that a shot for "ericf" user?

     

    2. Can you also try the same test but this time with a dbadmin user?

     

    3. In the background, the connector does an alter table in addition to the create table. Please could you confirm that you are able to do both create and alter table, in the ericf schema as the ericf user?

     

    Thanks,

    Harshad

  • Hi Eric,

     

    I did some tests in the lab, and also tried using the code that you had pasted. It worked fine here and I was able to get the data from Spark to Vertica.

     

    1. I then created a test user called sparky and test schema called sparky_schema, and tried it -- it failed. I had to do the following to make it work:

    GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO sparky;

    Can you give that a shot for "ericf" user?

    2. Can you also try the same test but this time with a dbadmin user?

    3. In the background, the connector does an alter table in addition to the create table. Please could you confirm that you are able to do both create and alter table, in the ericf schema as the ericf user?

     

    Thanks,
    Harshad

  • Hi Eric,

    I did some tests in the lab, and also tried using the code that you had pasted. It worked fine here and I was able to get the data from Spark to Vertica.

    1. I then created a test user called sparky and test schema called sparky_schema, and tried it -- it failed. I had to do the following to make it work:

    GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO sparky;

    Can you give that a shot for "ericf" user?

    2. Can you also try the same test but this time with a dbadmin user?

    3. In the background, the connector does an alter table in addition to the create table. Please could you confirm that you are able to do both create and alter table, in the ericf schema as the ericf user?

    Thanks,
    Harshad

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file