Issues while saving dataframe using Vertica Spark Connector V2
hi,
I am trying out vertica spark connector v2, but getting below error:
Vertica version: 12.0.4
Spark 3.1.2
hadoop: 3.2.1
below is my connection details:
host="localhost"
port=5433
db="vmart"
user="dbadmin"
password=""
filepath="hdfs://hdfshost:8020/user/hadoop/planstream/",
prevent_cleanup=true
val tableName = "dftest"
// Define schema of a table with a single integer attribute
val schema = new StructType(Array(StructField("col1", IntegerType)))
// Create n rows with element '77'
val n = 20
val data = (0 until n).map(_ => Row(77))
// Create a dataframe corresponding to the schema and data specified above
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).coalesce(1)
// Outputs dataframe schema
val mode = SaveMode.Append
println("----------")
df.printSchema()
df.show(10,false)
// Write dataframe to Vertica
options.foreach(i=>println(i._1 +" "+ i._2))
df.write.format(VERTICA_SOURCE)
.options(options + ("table" -> tableName))
.mode(mode)
.save()
commit: Failed to copy rows into target table
performCopy: JDBC error when trying to copy
Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
Error when sending query
Unexpected SQL Error.
A generic JDBC error occurred
Caused by:
java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/818d7193_ccea_49e3_be0f_7ece64d15c79/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/818d7193_ccea_49e3_be0f_7ece64d15c79/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
Stack trace:
com.vertica.util.ServerErrorData.buildException(Unknown Source)
com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
com.vertica.dataengine.VResultSet.initialize(Unknown Source)
com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
com.vertica.jdbc.common.SPreparedStatement.executeWithParams(SPreparedStatement.java:4165)
com.vertica.jdbc.common.SPreparedStatement.executeQuery(SPreparedStatement.java:1295)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.$anonfun$query$1(VerticaJdbcLayer.scala:287)
scala.util.Try$.apply(Try.scala:213)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.query(VerticaJdbcLayer.scala:284)
com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.performCopy(VerticaDistributedFilesystemWritePipe.scala:426)
com.vertica.spark.datasource.core.VerticaDistributedFilesystemWritePipe.$anonfun$commitDataIntoVertica$10(VerticaDistributedFilesystemWritePipe.scala:473)
scala.util.Either.flatMap(Either.scala:341)
Hdfs config check:
-- hdfs_cluster_config_check --
Hadoop Conf Path : [/opt/vertica]
[OK] HadoopConfDir verified on all nodes
Token Refresh Frequency (seconds) : [0]
HadoopFSBlockSizeBytes (MB) : [64]
[OK] Found [1] hadoop cluster configurations
------------- Cluster 1 -------------
Is DefaultFS : [true]
Nameservice : []
Namenodes : [hdfshost:8020]
High Availability : [false]
RPC Encryption : [false]
Kerberos Authentication : [false]
HTTPS Only : [false]
[INFO] Unable to get a Libhdfs++ handle, using Webhdfs
[INFO] Checking connections to [hdfs:///]
vertica : [OK]
dbadmin : [OK]
[INFO] Checking connections to [http://hdfshost:9870]
[OK] Can make authenticated external curl connection
[INFO] Checking webhdfs
vertica : [OK]
dbadmin : [OK]
[!] hdfs_cluster_config_check : [PASS]
Answers
Vertica wants to connect to the WebHDFS interface, which is listening on port 9870 as shown in the HDFS config check final test.
Try the following setting in your script:
filepath="hdfs://hdfshost:9870/user/hadoop/planstream/"
Hi Bryan,
Good to hear from you.
I tried that as well, getting below error:
23/11/25 11:52:28 INFO VerticaDistributedFilesystemWritePipe: Writing data to Parquet file.
23/11/25 11:52:30 ERROR VerticaBatchReader: Error in communication with filestore. Check the 'staging_fs_url' parameter.
Caused by:
org.apache.hadoop.ipc.RpcException: RPC response exceeds maximum data length
Stack trace:
org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1936)
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)
23/11/25 11:52:30 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
23/11/25 11:52:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/25 11:52:31 INFO MemoryStore: MemoryStore cleared
23/11/25 11:52:31 INFO BlockManager: BlockManager stopped
23/11/25 11:52:31 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/25 11:52:31 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/25 11:52:32 INFO SparkContext: Successfully stopped SparkContext
-
- FAILED: Unexpected error.
-
23/11/25 11:52:32 INFO SparkContext: SparkContext already stopped.
23/11/25 11:52:32 INFO ShutdownHookManager: Shutdown hook called
23/11/25 11:52:32 INFO ShutdownHookManager: Deleting directory C:\Users\mtiwari\AppData\Local\Temp\spark-0b2cc428-48f2-49d6-b852-598d1ab7f110
com.vertica.spark.util.error.ConnectorException: Error in communication with filestore. Check the 'staging_fs_url' parameter.
Caused by: RPC response exceeds maximum data length
at com.vertica.spark.util.error.ErrorHandling$.logAndThrowError(ErrorHandling.scala:78)
at com.vertica.spark.datasource.v2.VerticaBatchWrite.(VerticaDatasourceV2Write.scala:71)
at com.vertica.spark.datasource.v2.VerticaWriteBuilder.buildForBatch(VerticaDatasourceV2Write.scala:51)
at org.apache.spark.sql.connector.write.WriteBuilder$1.toBatch(WriteBuilder.java:44)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
Below is updated connections:
host="verticahost"
port=5433
db="vmart"
user="dbadmin"
password=""
filepath="hdfs://hdfshost:9870/user/hadoop/planstream",
prevent_cleanup=true
is_local="true
Which version of Spark connector - that is, full name of the JAR - are you running? We are up to V3.3.5, and this version may be necessary to support Hadoop/Spark v3. The current release is available from the GitHub link below.
It looks like I was in error about the parameter name and value: the file path where temporary files are stored is now called "staging_fs_url" so the parameter should be:
staging_fs_url="hdfs://hdfshost:9870/user/hadoop/planstream"
Please see current connector documentation and examples at https://github.com/vertica/spark-connector
hi Bryan,
This is the final connection config looks like, have referred the github link to develop the shared example.
vertica spark connector version: "com.vertica.spark" % "vertica-spark" % s"3.3.5-slim",
Jar name: vertica-spark-3.3.5-slim
/**
* Base options needed to connect to Vertica
* */
val options = Map(
"host" -> conf.getString("examples.host"),
"user" -> conf.getString("examples.user"),
"db" -> conf.getString("examples.db"),
"staging_fs_url" -> conf.getString("examples.filepath"),
"password" -> conf.getString("examples.password"),
"prevent_cleanup"->conf.getString("examples.prevent_cleanup")
)
config file snippet : # Base configurations used by all examples
examples {
host="verticahost"
port=5433
db="vmart"
user="dbadmin"
password=""
filepath="hdfs://hdfshost:9870/user/hadoop/planstream",
prevent_cleanup=true
is_local="true"
}
OK, it looks like I'm confusing Spark (client) settings and Vertica (server) settings. The Spark client side was probably correct at first, since the first error is a JDBC server side error. However, the server side Hadoop config check passed. So I wonder if the Spark client is sending a host:port to the server that isn't working. Try setting the filepath/staging_fs_url back to port 8020, then run the Spark job again until it errors, then go onto the cluster and check vertica.log for the error - try grep for "planstream" in the HDFS URL, and share the complete server-side info including the request from the Spark client and the error message.
after changing the staging_fs_url back to 8020:
below is the error from
spark(client)
23/11/26 10:29:47 INFO VerticaDistributedFilesystemWritePipe: Building default copy column list
23/11/26 10:29:47 INFO SchemaTools: Load by name. Column list: ("col1")
23/11/26 10:29:47 INFO VerticaDistributedFilesystemWritePipe: The copy statement is:
COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT
23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Performing copy from file store to Vertica
23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Committing data into Vertica.
23/11/26 10:29:53 INFO VerticaDistributedFilesystemWritePipe: Timed operation: Copy and commit data into Vertica -- took 6677 ms.
23/11/26 10:29:53 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
performCopy: JDBC error when trying to copy
Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
Error when sending query
Unexpected SQL Error.
A generic JDBC error occurred
Caused by:
java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
vertica log:
2023-11-26 04:59:37.773 Init Session:0x7f3c76ffd700-a0000000003748 [EE] VTRowLoader creates 1 memory block for projection vs_tables_view_p
2023-11-26 04:59:38.000 MetadataPoolMonitor:0x7f3c74ff9700 [ResourceManager] Update metadata resource pool memory with delta: Memory(KB): 1
2023-11-26 04:59:38.000 MetadataPoolMonitor:0x7f3c74ff9700 @v_vmart_node0001: 00000/7794: Updated metadata pool: Memory(KB): 76329
2023-11-26 04:59:46.001 StoredProcedureScheduler:0x7f3ca6ffd700 [Util] Task 'StoredProcedureScheduler' enabled
2023-11-26 04:59:47.075 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) SELECT version();
2023-11-26 04:59:47.079 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'SELECT version();'
2023-11-26 04:59:47.083 Init Session:0x7f3c76ffd700-a0000000003748 [EE] VTRowLoader creates 1 memory block for projection dual_p
2023-11-26 04:59:47.101 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) SELECT * FROM "dftest" WHERE 1=0
2023-11-26 04:59:47.106 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'SELECT * FROM "dftest" WHERE 1=0'
2023-11-26 04:59:47.130 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) select is_temp_table as t from v_catalog.tables where table_name=? and table_schema=?
2023-11-26 04:59:47.138 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'select is_temp_table as t from v_catalog.tables where table_name='dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac' and table_schema='public''
2023-11-26 04:59:47.167 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 58V01/3664: Invalid filename. Input filename is an empty string
LOCATION: parseSourceFiles, /data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/Optimizer/Path/BulkLoad.cpp:3593
2023-11-26 04:59:47.197 Init Session:0x7f3c76ffd700-a0000000003748 [Session] [PQuery] TX:a0000000003748(v_vmart_node0001-52:0xca) EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT
2023-11-26 04:59:47.200 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 00000/3316: Executing statement: 'EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT'
2023-11-26 04:59:47.203 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Inserting cache entry for WebHdfsConnection associated with session "v_vmart_node0001-52:0xca", ticket "username dbadmin", username "dbadmin"
2023-11-26 04:59:47.205 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] cURL shared handle (DNS) initializing
2023-11-26 04:59:48.000 MetadataPoolMonitor:0x7f3c837fe700 [ResourceManager] Update metadata resource pool memory with delta: Memory(KB): 512
2023-11-26 04:59:48.000 MetadataPoolMonitor:0x7f3c837fe700 @v_vmart_node0001: 00000/7794: Updated metadata pool: Memory(KB): 76841
2023-11-26 04:59:49.581 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Failed reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
(/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068). Sleeping for 3000
2023-11-26 04:59:53.123 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Failed reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
(/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068). Sleeping for 83
2023-11-26 04:59:53.728 Init Session:0x7f3c76ffd700-a0000000003748 [SAL] Aborting with HTTP error reading http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin: 22 - Seen exception:
URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
(/data/jenkins/workspace/RE-ReleaseBuilds/RE-Knuckleboom_2/server/vertica/SAL/HDFS/HDFSLib.cpp:2068)
2023-11-26 04:59:53.729 Init Session:0x7f3c76ffd700-a0000000003748 @v_vmart_node0001: 58030/6776: Failed to glob [hdfs://hdfshost:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
URL: [http://hdfshost:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
Can you connect to Vertica and try running the COPY on the WebHDFS port 9870 instead: (note this is an EXPLAIN and will not make any changes, it will simply verify that the plan is valid)
EXPLAIN COPY "dftest" ("col1") FROM 'hdfs://hdfshost:9870/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT;
QUERY PLAN DESCRIPTION:
explain COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com:9870/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_aead145d_5721_4f52_b14a_b1d4b3b262ac_COMMITS" NO COMMIT
Access Path:
+-DML INSERT [Cost: 0, Rows: 0]
| Target Projection: public.dftest_super (SORT BY PROJECTION SORT ORDER) (RESEGMENT)
| Target Prep:
------------------------------
PLAN: BASE BULKLOAD PLAN (GraphViz Format)
digraph G {
graph [rankdir=BT, label = " BASE BULKLOAD PLAN \nAll Nodes Vector: \n\n node[0]=v_vmart_node0001 (initiator) Up\n", labelloc=t, labeljust=l ordering=out]
0[label = "Root \nOutBlk=[UncTuple]", color = "green", shape = "house"];
1[label = "NewEENode \nOutBlk=[UncTuple(2)]", color = "green", shape = "box"];
2[label = "ExprEval: \n $1\n (-9223372036854775808)\nUnc: Integer(8)\nUnc: Integer(8)", color = "green", shape = "box"];
3[label = "LoadUnion", color = "purple", shape = "box"];
4[label = "ExprEval: \n col1(VAL(0))\nUnc: Integer(8)", color = "brown", shape = "box"];
5[label = "LoadStep: dftest\n (0 paths with unmatched partition have been pruned from PARQUET source list; 0% of PARQUET data matched with co-located Vertica nodes; 0% of PARQUET data including co-located data can be loaded with rack locality)\n\nUnc: Integer(8)", color = "brown", shape = "box"];
6[label = "DT(DIRECT)[dftest_super]\n+Sort (keys = A,N) \nOutBlk=[UncTuple]", color = "blue", shape = "ellipse"];
7[label = "Segment() \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
8[label = "ValExpNode \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
9[label = "DIST-Union (#cols = 2,#strms = 1) \nOutBlk=[UncTuple(2)]", color = "green", shape = "ellipse"];
10[label = "Basic-Union (#cols = 1,#strms = 1) \nOutBlk=[UncTuple]", color = "green", shape = "ellipse"];
11[label = "GroupBy(NOTHING) \nOutBlk=[UncTuple]", color = "green", shape = "box"];
1->7 [label = "V[0] C=2",color = "black",style="bold", arrowtail="inv"];
2->1 [label = "0",color = "blue"];
3->2 [label = "0",color = "blue"];
4->3 [label = "0",color = "blue"];
5->4 [label = "0",color = "blue"];
6->10 [label = "V[0]",color = "black"];
7->9 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
8->6 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
9->8 [label = "P+V[0] C=2",color = "black",style="bold", arrowtail="inv"];
10->11 [label = "V[0]",color = "black"];
11->0 [label = "V[0]",color = "black"];
}
Lock Map - Table (Mode)
dftest(I)
Strange, from sql workbench (client) it is able to load data.
With 8020 port getting below error:
An error occurred when executing the SQL command:
explain COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parq...
[Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com:8020/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-159.cloud.operative.com:8020/webhdfs/v1/user/hadoop/planstream/aead145d_5721_4f52_b14a_b1d4b3b262ac/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon. [SQL State=58030, DB Errorcode=6776]
1 statement failed.
One last thing to try: since port 8020 is default for namenode, and 9870 is default for WedHDFS, please try staging_fs_url with no port, just the host, and let the HDFS client decide the port. If it still doesn't work, please post any new errors if no port is set, and I'll see if engineering can look into this during the week.
COPY "dftest" ("col1") FROM 'hdfs://ip-10-60-6-159.cloud.operative.com/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/*.parquet' ON ANY NODE parquet REJECTED DATA AS TABLE "dftest_47ab6d73_583c_4b0b_9ba2_0742e9e499c2_COMMITS" NO COMMIT
23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Performing copy from file store to Vertica
23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Committing data into Vertica.
23/11/27 11:29:43 INFO VerticaDistributedFilesystemWritePipe: Timed operation: Copy and commit data into Vertica -- took 23285 ms.
23/11/27 11:29:43 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
performCopy: JDBC error when trying to copy
Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
Error when sending query
Unexpected SQL Error.
A generic JDBC error occurred
Caused by:
java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-159.cloud.operative.com/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-159.cloud.operative.com/webhdfs/v1/user/hadoop/planstream/47ab6d73_583c_4b0b_9ba2_0742e9e499c2/?op=LISTSTATUS&user.name=dbadmin]
HTTP response code: 403
HTTP response message: connecting to ip-10-60-6-159.cloud.operative.com:80: connecting to 10.60.6.159:80: dial tcp 10.60.6.159:80: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
Stack trace:
com.vertica.util.ServerErrorData.buildException(Unknown Source)
com.vertica.dataengine.VResultSet.fetchChunk(Unknown Source)
com.vertica.dataengine.VResultSet.initialize(Unknown Source)
com.vertica.dataengine.VQueryExecutor.readExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.handleExecuteResponse(Unknown Source)
com.vertica.dataengine.VQueryExecutor.execute(Unknown Source)
com.vertica.jdbc.common.SPreparedStatement.executeWithParams(SPreparedStatement.java:4165)
com.vertica.jdbc.common.SPreparedStatement.executeQuery(SPreparedStatement.java:1295)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.$anonfun$query$1(VerticaJdbcLayer.scala:287)
scala.util.Try$.apply(Try.scala:213)
com.vertica.spark.datasource.jdbc.VerticaJdbcLayer.query(VerticaJdbcLayer.scala:284)
hi Bryan,
Did you hear anything from engineering, any additional info they need ?
@Bryan_H
any update that you can share from engineering ?
following up again, any update on this, created a support ticket as well but there also no luck
can you share me your spark shell command that you are launching? I am trying to run your code and it fails at below step... I am using the below spark shell command
/usr/local/spark/bin/spark-shell --jars vertica-jdbc-12.0.4-14.jar,spark-vertica-connector-slim3.3.2.jar
scala> df.write.format("com.vertica.spark.datasource.VerticaSource").options(opts ).mode(mode).save()
org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'vertica': com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:1487)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:77)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.$anonfun$getTableProviderCatalog$1(CatalogV2Util.scala:364)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTableProviderCatalog(CatalogV2Util.scala:364)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:292)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
... 47 elided
Caused by: java.lang.NoClassDefFoundError: cats/Semigroupal
at com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.(VerticaDatasourceV2Catalog.scala:37)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:59)
... 55 more
Caused by: java.lang.ClassNotFoundException: cats.Semigroupal
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 61 more
Hi @SruthiA ,
I did not run it through spark-shell, I am first trying it from IDE(intellij) from local.
here is my build.sbt file ( This is same as what is available in connector git hub page https://github.com/vertica/spark-connector/blob/main/examples/scala/build.sbt).
// (c) Copyright [2020-2021] Micro Focus or one of its affiliates.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import java.util.Properties
// Retrieving the common property config containing the connector version number.
val props = settingKeyProperties
props := {
val prop = new Properties()
IO.load(prop, new File("../../version.properties"))
prop
}
scalaVersion := "2.12.12"
name := "spark-vertica-connector-scala-examples"
organization := "com.vertica"
version := props.value.getProperty("connector-version")
resolvers += "Artima Maven Repository" at "https://repo.artima.com/releases"
resolvers += "jitpack" at "https://jitpack.io"
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.1",
//"com.vertica.spark" % "vertica-spark" % s"${version.value}-slim",
"com.vertica.spark" % "vertica-spark" % "3.3.5-slim",
"org.apache.spark" %% "spark-core" % "3.3.0",
"org.apache.spark" %% "spark-sql" % "3.3.0",
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.6",
// This version needs to match the Hadoop version used by Spark
"org.apache.hadoop" % "hadoop-aws" % "3.3.2"
)
assembly / assemblyJarName := s"vertica-spark-scala-examples.jar"
assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("cats.**" -> "shadeCats.@1").inAll
)
After you mentioned I have also tried it through spark-shell it is throwing same error to me(maybe due to we did not specify all the transient dependency while opening the spark-shell)
scala> .save()
org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'vertica': com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2132)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:82)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.$anonfun$getTableProviderCatalog$1(CatalogV2Util.scala:433)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTableProviderCatalog(CatalogV2Util.scala:433)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:327)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
... 49 elided
Caused by: java.lang.NoClassDefFoundError: cats/Semigroupal
at com.vertica.spark.datasource.v2.VerticaDatasourceV2Catalog.(VerticaDatasourceV2Catalog.scala:37)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:64)
... 57 more
Caused by: java.lang.ClassNotFoundException: cats.Semigroupal
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 63 more
after specifying all the dependent jar, getting same error as from local.
spark submit : spark-shell --jars file:////home/hadoop/lib/*
23/12/16 09:59:47 ERROR VerticaBatchReader: commit: Failed to copy rows into target table
performCopy: JDBC error when trying to copy
Error in commit step of write to Vertica. There was a failure copying data from the intermediary into Vertica.
Error when sending query
Unexpected SQL Error.
A generic JDBC error occurred
Caused by:
java.sql.SQLException: [Vertica]VJDBC ERROR: Failed to glob [hdfs://ip-10-60-6-81.cloud.operative.com:8020/user/hadoop/localtest/088e88c8_b7fb_46c1_bbf0_10fd09639eb6/*.parquet] because of error: Seen exception:
URL: [http://ip-10-60-6-81.cloud.operative.com:8020/webhdfs/v1/user/hadoop/localtest/088e88c8_b7fb_46c1_bbf0_10fd09639eb6/?op=LISTSTATUS&user.name=etluser]
HTTP response code: 404
HTTP response message: It looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.
update to other users:
Connector will work with below configs:
host="localhost"
port=5433
db="vmart"
user="dbadmin"
password=""
filepath="webhdfs://hdfshost:9870/user/hadoop/planstream/",
prevent_cleanup=true
and it works well as long as spark is loading data to one table at a time, but when you try to save parallel datraframes it will start failing with below error
[ERROR] com.vertica.spark.datasource.v2.VerticaBatchReader - JDBC Error when trying to rollback: rollback: JDBC Error while rolling back.
Connection to the JDBC source is down or invalid. Please ensure that the JDBC source is running properly.
commit: Failed to get column list
getColumnList: Error building default copy column list
Failed to create a valid column list for the write operation due to mismatch with the existing table.
JDBC failure when trying to retrieve schema
getPreparedStatement: Error while getting prepared statement
Seems it is trying to check the column definition of one table and trying to save data of another one .(This is just my observation so far).