Issues while saving dataframe using Vertica Spark Connector V2

mt25mt25 ✭✭
edited November 2023 in General Discussion

hi,

I am trying out vertica spark connector v2, but getting below error:
Vertica version: 12.0.4
Spark 3.1.2
hadoop: 3.2.1

below is my connection details:
host="localhost"
port=5433
db="vmart"
user="dbadmin"
password=""
filepath="hdfs://hdfshost:8020/user/hadoop/planstream/",
prevent_cleanup=true

val tableName = "dftest"
// Define schema of a table with a single integer attribute
val schema = new StructType(Array(StructField("col1", IntegerType)))
// Create n rows with element '77'
val n = 20
val data = (0 until n).map(_ => Row(77))
// Create a dataframe corresponding to the schema and data specified above
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).coalesce(1)
// Outputs dataframe schema
val mode = SaveMode.Append
println("----------")
df.printSchema()
df.show(10,false)
// Write dataframe to Vertica
options.foreach(i=>println(i._1 +" "+ i._2))
df.write.format(VERTICA_SOURCE)
.options(options + ("table" -> tableName))
.mode(mode)
.save()

commit: Failed to copy rows into target table
performCopy: JDBC error when trying to copy
Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
Error when sending query
Unexpected SQL Error.
A generic JDBC error occurred

Caused by:
java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/818d7193_ccea_49e3_be0f_7ece64d15c79/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/818d7193_ccea_49e3_be0f_7ece64d15c79/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

Stack trace:
com.vertica.util.ServerErrorData.buildException(Unknown Source)
com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
com.vertica.dataengine.VResultSet.initialize(Unknown Source)
com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
com.vertica.jdbc.common.SPreparedStatement.executeWithParams(SPreparedStatement.java:4165)
com.vertica.jdbc.common.SPreparedStatement.executeQuery(SPreparedStatement.java:1295)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.$anonfun$query$1(VerticaJdbcLayer.scala:287)
scala.util.Try$.apply(Try.scala:213)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.query(VerticaJdbcLayer.scala:284)
com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.performCopy(VerticaDistributedFilesystemWritePipe.scala:426)
com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$commitDataIntoVertica$10(VerticaDistributedFilesystemWritePipe.scala:473)
scala.util.Either.flatMap(Either.scala:341)

Hdfs config check:

-- hdfs_cluster_config_check --

Hadoop Conf Path : [/opt/vertica]

[OK] HadoopConfDir verified on all nodes
Token Refresh Frequency (seconds) : [0]
HadoopFSBlockSizeBytes (MB) : [64]

[OK] Found [1] hadoop cluster configurations

------------- Cluster 1 -------------
Is DefaultFS : [true]
Nameservice : []
Namenodes : [hdfshost:8020]
High Availability : [false]
RPC Encryption : [false]
Kerberos Authentication : [false]
HTTPS Only : [false]
[INFO] Unable to get a Libhdfs++ handle, using Webhdfs
[INFO] Checking connections to [hdfs:///]
vertica : [OK]
dbadmin : [OK]

[INFO] Checking connections to [http://hdfshost:9870]
[OK] Can make authenticated external curl connection
[INFO] Checking webhdfs
vertica : [OK]
dbadmin : [OK]

[!] hdfs_cluster_config_check : [PASS]

Answers

  • Bryan_HBryan_H Vertica Employee Administrator

    Vertica wants to connect to the WebHDFS interface, which is listening on port 9870 as shown in the HDFS config check final test.
    Try the following setting in your script:
    filepath="hdfs://hdfshost:9870/user/hadoop/planstream/"

  • Hi Bryan,

    Good to hear from you.

    I tried that as well, getting below error:
    23/11/25 11:52:28 INFO VerticaDistributedFilesystemWritePipe: Writing data to Parquet file.
    23/11/25 11:52:30 ERROR VerticaBatchReader: Error in communication with filestore. Check the 'staging_fs_url' parameter.

    Caused by:
    org.apache.hadoop.ipc.RpcException: RPC response exceeds maximum data length
    Stack trace:
    org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1936)
    org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
    org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)
    23/11/25 11:52:30 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
    23/11/25 11:52:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    23/11/25 11:52:31 INFO MemoryStore: MemoryStore cleared
    23/11/25 11:52:31 INFO BlockManager: BlockManager stopped
    23/11/25 11:52:31 INFO BlockManagerMaster: BlockManagerMaster stopped
    23/11/25 11:52:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

    23/11/25 11:52:32 INFO SparkContext: Successfully stopped SparkContext

    -
    - FAILED: Unexpected error.
    -


    23/11/25 11:52:32 INFO SparkContext: SparkContext already stopped.
    23/11/25 11:52:32 INFO ShutdownHookManager: Shutdown hook called
    23/11/25 11:52:32 INFO ShutdownHookManager: Deleting directory C:\Users\mtiwari\AppData\Local\Temp\spark-0b2cc428-48f2-49d6-b852-598d1ab7f110
    com.vertica.spark.util.error.ConnectorException: Error in communication with filestore. Check the 'staging_fs_url' parameter.
    Caused by: RPC response exceeds maximum data length
    at com.vertica.spark.util.error.ErrorHandling$.logAndThrowError(ErrorHandling.scala:78)
    at com.vertica.spark.datasource.v2.VerticaBatchWrite.(VerticaDatasourceV2Write.scala:71)
    at com.vertica.spark.datasource.v2.VerticaWriteBuilder.buildForBatch(VerticaDatasourceV2Write.scala:51)
    at org.apache.spark.sql.connector.write.WriteBuilder$1.toBatch(WriteBuilder.java:44)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)

    Below is updated connections:
    host="verticahost"
    port=5433
    db="vmart"
    user="dbadmin"
    password=""
    filepath="hdfs://hdfshost:9870/user/hadoop/planstream",
    prevent_cleanup=true
    is_local="true

  • Bryan_HBryan_H Vertica Employee Administrator

    Which version of Spark connector - that is, full name of the JAR - are you running? We are up to V3.3.5, and this version may be necessary to support Hadoop/Spark v3. The current release is available from the GitHub link below.
    It looks like I was in error about the parameter name and value: the file path where temporary files are stored is now called "staging_fs_url" so the parameter should be: staging_fs_url="hdfs://hdfshost:9870/user/hadoop/planstream"
    Please see current connector documentation and examples at https://github.com/vertica/spark-connector

  • hi Bryan,
    This is the final connection config looks like, have referred the github link to develop the shared example.

    vertica spark connector version: "com.vertica.spark" % "vertica-spark" % s"3.3.5-slim",
    Jar name: vertica-spark-3.3.5-slim
    /**
    * Base options needed to connect to Vertica
    * */
    val options = Map(
    "host" -> conf.getString("examples.host"),
    "user" -> conf.getString("examples.user"),
    "db" -> conf.getString("examples.db"),
    "staging_fs_url" -> conf.getString("examples.filepath"),
    "password" -> conf.getString("examples.password"),
    "prevent_cleanup"->conf.getString("examples.prevent_cleanup")
    )

    config file snippet : # Base configurations used by all examples
    examples {
    host="verticahost"
    port=5433
    db="vmart"
    user="dbadmin"
    password=""
    filepath="hdfs://hdfshost:9870/user/hadoop/planstream",
    prevent_cleanup=true
    is_local="true"
    }

  • Bryan_HBryan_H Vertica Employee Administrator

    OK, it looks like I'm confusing Spark (client) settings and Vertica (server) settings. The Spark client side was probably correct at first, since the first error is a JDBC server side error. However, the server side Hadoop config check passed. So I wonder if the Spark client is sending a host:port to the server that isn't working. Try setting the filepath/staging_fs_url back to port 8020, then run the Spark job again until it errors, then go onto the cluster and check vertica.log for the error - try grep for "planstream" in the HDFS URL, and share the complete server-side info including the request from the Spark client and the error message.

  • after changing the staging_fs_url back to 8020:

    below is the error from
    spark(client)
    23/11/26 10:29:47 INFO VerticaDistributedFilesystemWritePipe: Building default copy column list
    23/11/26 10:29:47 INFO SchemaTools: Load by name. Column list: ("col1")
    23/11/26 10:29:47 INFO VerticaDistributedFilesystemWritePipe: The copy statement is:
    COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT
    23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Performing copy from file store to Vertica
    23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Committing data into Vertica.
    23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Timed operation: Copy and commit data into Vertica -- took 6677 ms.
    23/11/26 10:29:53 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
    performCopy: JDBC error when trying to copy
    Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
    Error when sending query
    Unexpected SQL Error.
    A generic JDBC error occurred

    Caused by:
    java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
    URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

    vertica log:

    2023-11-26 04:59:37.773 Init Session:0x7f3c76ffd700-a0000000003748 [EE] VTRowLoader creates 1 memory block for projection vs_tables_view_p
    2023-11-26 04:59:38.000 MetadataPoolMonitor:0x7f3c74ff9700 [ResourceManager] Update metadata resource pool memory with delta: Memory(KB): 1
    2023-11-26 04:59:38.000 MetadataPoolMonitor:0x7f3c74ff9700 @v_vmart_node0001: 00000/7794: Updated metadata pool: Memory(KB): 76329
    2023-11-26 04:59:46.001 StoredProcedureScheduler:0x7f3ca6ffd700 [Util] Task 'StoredProcedureScheduler' enabled
    2023-11-26 04:59:47.075 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) SELECT version();
    2023-11-26 04:59:47.079 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'SELECT version();'
    2023-11-26 04:59:47.083 Init Session:0x7f3c76ffd700-a0000000003748 [EE] VTRowLoader creates 1 memory block for projection dual_p
    2023-11-26 04:59:47.101 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) SELECT * FROM "dftest" WHERE 1=0
    2023-11-26 04:59:47.106 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'SELECT * FROM "dftest" WHERE 1=0'
    2023-11-26 04:59:47.130 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) select is_temp_table as t from v_catalog.tables where table_name=? and table_schema=?
    2023-11-26 04:59:47.138 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'select is_temp_table as t from v_catalog.tables where table_name='dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac' and table_schema='public''

    2023-11-26 04:59:47.167 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 58V01/3664: Invalid filename. Input filename is an empty string
    LOCATION: parseSourceFiles, /data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/Optimizer/Path/BulkLoad.cpp:3593
    2023-11-26 04:59:47.197 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT
    2023-11-26 04:59:47.200 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/
    .parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT'
    2023-11-26 04:59:47.203 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Inserting cache entry for WebHdfsConnection associated with session "v_vmart_node0001-52:0xca", ticket "username dbadmin", username "dbadmin"
    2023-11-26 04:59:47.205 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] cURL shared handle (DNS) initializing
    2023-11-26 04:59:48.000 MetadataPoolMonitor:0x7f3c837fe700 [ResourceManager] Update metadata resource pool memory with delta: Memory(KB): 512
    2023-11-26 04:59:48.000 MetadataPoolMonitor:0x7f3c837fe700 @v_vmart_node0001: 00000/7794: Updated metadata pool: Memory(KB): 76841
    2023-11-26 04:59:49.581 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Failed reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
    URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

    (/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068). Sleeping for 3000
    2023-11-26 04:59:53.123 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Failed reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
    URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

    (/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068). Sleeping for 83
    2023-11-26 04:59:53.728 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Aborting with HTTP error reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
    URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

    (/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068)
    2023-11-26 04:59:53.729 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 58030/6776: Failed to glob [hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
    URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

  • Bryan_HBryan_H Vertica Employee Administrator

    Can you connect to Vertica and try running the COPY on the WebHDFS port 9870 instead: (note this is an EXPLAIN and will not make any changes, it will simply verify that the plan is valid)
    EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:9870/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT;

  • mt25mt25 ✭✭
    edited November 2023

    QUERY PLAN DESCRIPTION:

    explain COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com:9870/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT

    Access Path:
    +-DML INSERT [Cost: 0, Rows: 0]
    | Target Projection: public.dftest_super (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
    | Target Prep:

    ------------------------------

    PLAN: BASE BULKLOAD PLAN (GraphViz Format)

    digraph G {
    graph [rankdir=BT, label = " BASE BULKLOAD PLAN \nAll Nodes Vector: \n\n node[0]=v_vmart_node0001 (initiator) Up\n", labelloc=t, labeljust=l ordering=out]
    0[label = "Root \nOutBlk=[UncTuple]", color = "green", shape = "house"];
    1[label = "NewEENode \nOutBlk=[UncTuple(2)]", color = "green", shape = "box"];
    2[label = "ExprEval: \n $1\n (-9223372036854775808)\nUnc: Integer(8)\nUnc: Integer(8)", color = "green", shape = "box"];
    3[label = "LoadUnion", color = "purple", shape = "box"];
    4[label = "ExprEval: \n col1(VAL(0))\nUnc: Integer(8)", color = "brown", shape = "box"];
    5[label = "LoadStep: dftest\n (0 paths with unmatched partition have been pruned from PARQUET source list; 0% of PARQUET data matched with co-located Vertica nodes; 0% of PARQUET data including co-located data can be loaded with rack locality)\n\nUnc: Integer(8)", color = "brown", shape = "box"];
    6[label = "DT(DIRECT)[dftest_super]\n+Sort (keys = A,N) \nOutBlk=[UncTuple]", color = "blue", shape = "ellipse"];
    7[label = "Segment() \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
    8[label = "ValExpNode \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
    9[label = "DIST-Union (#cols = 2,#strms = 1) \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
    10[label = "Basic-Union (#cols = 1,#strms = 1) \nOutBlk=[UncTuple]", color = "green", shape = "ellipse"];
    11[label = "GroupBy(NOTHING) \nOutBlk=[UncTuple]", color = "green", shape = "box"];
    1->7 [label = "V[0] C=2",color = "black",style="bold", arrowtail="inv"];
    2->1 [label = "0",color = "blue"];
    3->2 [label = "0",color = "blue"];
    4->3 [label = "0",color = "blue"];
    5->4 [label = "0",color = "blue"];
    6->10 [label = "V[0]",color = "black"];
    7->9 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
    8->6 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
    9->8 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
    10->11 [label = "V[0]",color = "black"];
    11->0 [label = "V[0]",color = "black"];
    }

    Lock Map - Table (Mode)

    dftest(I)

  • Strange, from sql workbench (client) it is able to load data.

  • With 8020 port getting below error:
    An error occurred when executing the SQL command:
    explain COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parq...

    [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
    URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon. [SQL State=58030, DB Errorcode=6776]
    1 statement failed.

  • Bryan_HBryan_H Vertica Employee Administrator

    One last thing to try: since port 8020 is default for namenode, and 9870 is default for WedHDFS, please try staging_fs_url with no port, just the host, and let the HDFS client decide the port. If it still doesn't work, please post any new errors if no port is set, and I'll see if engineering can look into this during the week.

  • COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_47ab6d73_583c_4b0b_9ba2_0742e9e499c2_COMMITS" NO COMMIT
    23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Performing copy from file store to Vertica
    23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Committing data into Vertica.
    23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Timed operation: Copy and commit data into Vertica -- took 23285 ms.
    23/11/27 11:29:43 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
    performCopy: JDBC error when trying to copy
    Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
    Error when sending query
    Unexpected SQL Error.
    A generic JDBC error occurred

    Caused by:
    java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/*.parquet] because of error: Seen exception:
    URL: [http://ip-10-60-6-159.cloud.operative.com/webhdfs/v1/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/?op=LISTSTATUS&user.name=dbadmin]
    HTTP response code: 403
    HTTP response message: connecting to ip-10-60-6-159.cloud.operative.com:80: connecting to 10.60.6.159:80: dial tcp 10.60.6.159:80: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.

    Stack trace:
    com.vertica.util.ServerErrorData.buildException(Unknown Source)
    com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
    com.vertica.dataengine.VResultSet.initialize(Unknown Source)
    com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
    com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
    com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
    com.vertica.jdbc.common.SPreparedStatement.executeWithParams(SPreparedStatement.java:4165)
    com.vertica.jdbc.common.SPreparedStatement.executeQuery(SPreparedStatement.java:1295)
    com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.$anonfun$query$1(VerticaJdbcLayer.scala:287)
    scala.util.Try$.apply(Try.scala:213)
    com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.query(VerticaJdbcLayer.scala:284)

  • hi Bryan,

    Did you hear anything from engineering, any additional info they need ?

  • @Bryan_H

    any update that you can share from engineering ?

  • following up again, any update on this, created a support ticket as well but there also no luck

  • SruthiASruthiA Administrator

    can you share me your spark shell command that you are launching? I am trying to run your code and it fails at below step... I am using the below spark shell command

    /usr/local/spark/bin/spark-shell --jars vertica-jdbc-12.0.4-14.jar,spark-vertica-connector-slim3.3.2.jar

    scala> df.write.format("com.vertica.spark.datasource.VerticaSource").options(opts ).mode(mode).save()
    org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'vertica': com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:1487)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:77)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.$anonfun$getTableProviderCatalog$1(CatalogV2Util.scala:364)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTableProviderCatalog(CatalogV2Util.scala:364)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:292)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    ... 47 elided
    Caused by: java.lang.NoClassDefFoundError: cats/Semigroupal
    at com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.(VerticaDatasourceV2Catalog.scala:37)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:59)
    ... 55 more
    Caused by: java.lang.ClassNotFoundException: cats.Semigroupal
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
    ... 61 more

  • Hi @SruthiA ,

    I did not run it through spark-shell, I am first trying it from IDE(intellij) from local.
    here is my build.sbt file ( This is same as what is available in connector git hub page https://github.com/vertica/spark-connector/blob/main/examples/scala/build.sbt).

    // (c) Copyright [2020-2021] Micro Focus or one of its affiliates.
    // Licensed under the Apache License, Version 2.0 (the "License");
    // You may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    // http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    import java.util.Properties

    // Retrieving the common property config containing the connector version number.
    val props = settingKeyProperties
    props := {
    val prop = new Properties()
    IO.load(prop, new File("../../version.properties"))
    prop
    }

    scalaVersion := "2.12.12"
    name := "spark-vertica-connector-scala-examples"
    organization := "com.vertica"
    version := props.value.getProperty("connector-version")

    resolvers += "Artima Maven Repository" at "https://repo.artima.com/releases"
    resolvers += "jitpack" at "https://jitpack.io"

    libraryDependencies ++= Seq(
    "com.typesafe" % "config" % "1.4.1",
    //"com.vertica.spark" % "vertica-spark" % s"${version.value}-slim",
    "com.vertica.spark" % "vertica-spark" % "3.3.5-slim",
    "org.apache.spark" %% "spark-core" % "3.3.0",
    "org.apache.spark" %% "spark-sql" % "3.3.0",
    "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.6",
    // This version needs to match the Hadoop version used by Spark
    "org.apache.hadoop" % "hadoop-aws" % "3.3.2"
    )

    assembly / assemblyJarName := s"vertica-spark-scala-examples.jar"

    assembly / assemblyMergeStrategy := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
    }

    assembly / assemblyShadeRules := Seq(
    ShadeRule.rename("cats.**" -> "shadeCats.@1").inAll
    )

    After you mentioned I have also tried it through spark-shell it is throwing same error to me(maybe due to we did not specify all the transient dependency while opening the spark-shell)

    scala> .save()
    org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'vertica': com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2132)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:82)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.$anonfun$getTableProviderCatalog$1(CatalogV2Util.scala:433)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTableProviderCatalog(CatalogV2Util.scala:433)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:327)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    ... 49 elided
    Caused by: java.lang.NoClassDefFoundError: cats/Semigroupal
    at com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.(VerticaDatasourceV2Catalog.scala:37)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:64)
    ... 57 more
    Caused by: java.lang.ClassNotFoundException: cats.Semigroupal
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 63 more

  • after specifying all the dependent jar, getting same error as from local.
    spark submit : spark-shell --jars file:////home/hadoop/lib/*
    23/12/16 09:59:47 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
    performCopy: JDBC error when trying to copy
    Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
    Error when sending query
    Unexpected SQL Error.
    A generic JDBC error occurred

    Caused by:
    java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-81.cloud.operative.com:8020/user/hadoop/localtest/088e88c8_b7fb_46c1_bbf0_10fd09639eb6/*.parquet] because of error: Seen exception:
    URL: [http://ip-10-60-6-81.cloud.operative.com:8020/webhdfs/v1/user/hadoop/localtest/088e88c8_b7fb_46c1_bbf0_10fd09639eb6/?op=LISTSTATUS&user.name=etluser]
    HTTP response code: 404
    HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.

  • update to other users:
    Connector will work with below configs:
    host="localhost"
    port=5433
    db="vmart"
    user="dbadmin"
    password=""
    filepath="webhdfs://hdfshost:9870/user/hadoop/planstream/",
    prevent_cleanup=true

    and it works well as long as spark is loading data to one table at a time, but when you try to save parallel datraframes it will start failing with below error
    [ERROR] com.vertica.spark.datasource.v2.VerticaBatchReader - JDBC Error when trying to rollback: rollback: JDBC Error while rolling back.
    Connection to the JDBC source is down or invalid. Please ensure that the JDBC source is running properly.
    commit: Failed to get column list
    getColumnList: Error building default copy column list
    Failed to create a valid column list for the write operation due to mismatch with the existing table.
    JDBC failure when trying to retrieve schema
    getPreparedStatement: Error while getting prepared statement

    Seems it is trying to check the column definition of one table and trying to save data of another one .(This is just my observation so far).

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file