Vertica Spark Connector usage with Python in pyspark shell

Jeff-LJeff-L Employee, Registered User

The HPE Vertica Connector for Apache Spark can be used with Spark Scala as defined in the user guide, and can also be used with Spark's python interface: pyspark.  The below example (Vertica 7.1 or 7.2 and Spark 1,4) shows how to save a Spark DataFrame to Vertica as well as load a Spark DataFrame from a Vertica table.  There are 3 steps:

 

1)  To Append to an existing Vertica table: first create the table in Vertica as below using vsql; otherwise skip to step 2 if the table does not yet exist.

 

DROP TABLE people;
CREATE TABLE people (age int, name varchar(1024));
-- view the people table schema
/d people

 

2) From the Spark directory, start the pyspark shell, with the Vertica JDBC and spark_connector jars in the --jars arg as well as explicitly specified with the additional classpaths:

 

./bin/pyspark  --master local[2] --jars  /tmp/vertica-jdbc-7.x.jar,/tmp/spark_connector-0.x.jar  --driver-class-path '/tmp/vertica-jdbc-7.x.jar:/tmp/spark_connector-0.x.jar' --conf spark.executor.extraClassPath='/tmp/vertica-jdbc-7.x.jar:/tmp/spark_connector-0.x.jar'

 

3) Edit the below code with your file paths and database options (table, db, user, password, host) then paste directly into the pyspark shell (the people.json file is included in the Spark download):

 

from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
sqlContext = SQLContext(sc)
df = sqlContext.read.json("file:///usr/local/hadoop/spark-1.4.1-bin-hadoop2.4/examples/src/main/resources/people.json")

df.show
# verify the schema of your spark dataframe is in the same col order as your create table statement above.
# else you can change the below mode option from 'append' to 'overwrite'.

# view the current DataFrame
df.take(10)

# setup the user options
opts={}
opts['table']='people'
opts['db']='db00'
opts['user']='xyz'
opts['password']='xyz'
opts['host']='machine-00.labs.hpe.com'

# save the DataFrame to Vertica (alternatively use mode="overwrite")
df.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="append", **opts)

# load the Vertica table into a new Spark DataFrame
df_new=sqlContext.read.load(format="com.vertica.spark.datasource.DefaultSource", **opts)

# view the new DataFrame
df_new.take(10)

 

NOTE: additional options can be added to .save() and .load() as defined in the user guide document.

 

 

 

Comments

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file