Bulk loading slow when source file has about 3 million rows
I am using:
vertica-10.0.0-0.x86_64
CentOS Linux release 7.8.2003 (Core)
4x cpu
16gb memory
I have checked, cpu and memory seems like not an issue (more than half the memory is always free)
Vertica runs on 1 node and storage is on ext4 filesystem.
Source csv file is on localhost.
-- This loads 2 million rows to the table, and it takes 40 seconds vsql> COPY STG_SCHEMA.tab1 (col1, col2, col3, ..., colN) FROM '/home/dbadmin/test_2m.csv' WITH DELIMITER AS E'\t' SKIP 1; -- Now when bulk loading 3 million rows, it takes ~11 minutes. vsql> COPY STG_SCHEMA.tab1 (col1, col2, col3, ..., colN) FROM '/home/dbadmin/test_3m.csv' WITH DELIMITER AS E'\t' SKIP 1; -- 5 million rows takes ~20 minutes
Any suggestions, why it takes so much time..?
0
Answers
Can you share the DDL of the tables? Row count does not compare well. Row count and column count , and column types and column lengths influence the performance. Also the number of projections for each table could play a role.
Can you ...
SELECT EXPORT_OBJECTS('','STG_SCHEMA.tab1',FALSE);?
These csv files sizes are as follows:
Table has 41 columns.
db=> \d STG_SCHEMA.tab1; List of Fields by Tables Schema | Table | Column | Type | Size | Default | Not Null | Primary Key | Foreign Key -------------+-------+----------------+----------------+-------+---------+----------+-------------+------------- STG_SCHEMA | tab1 | id | int | 8 | | f | f | STG_SCHEMA | tab1 | column2 | int | 8 | | f | f | STG_SCHEMA | tab1 | column3 | int | 8 | | f | f | STG_SCHEMA | tab1 | column4 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column5 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column6 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column7 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column8 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column9 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column10 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column11 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column12 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column13 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column14 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column15 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column16 | numeric(37,15) | 16 | | f | f | STG_SCHEMA | tab1 | column17 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column18 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column19 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column20 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column21 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column22 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column23 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column24 | int | 8 | | f | f | STG_SCHEMA | tab1 | column25 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column26 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column27 | int | 8 | | f | f | STG_SCHEMA | tab1 | column28 | int | 8 | | f | f | STG_SCHEMA | tab1 | column29 | int | 8 | | f | f | STG_SCHEMA | tab1 | column30 | int | 8 | | f | f | STG_SCHEMA | tab1 | column31 | int | 8 | | f | f | STG_SCHEMA | tab1 | column32 | int | 8 | | f | f | STG_SCHEMA | tab1 | column33 | int | 8 | | f | f | STG_SCHEMA | tab1 | column34 | int | 8 | | f | f | STG_SCHEMA | tab1 | column35 | timestamptz | 8 | | f | f | STG_SCHEMA | tab1 | column36 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column37 | boolean | 1 | | f | f | STG_SCHEMA | tab1 | column38 | varchar(65000) | 65000 | | f | f | STG_SCHEMA | tab1 | column39 | boolean | 1 | | f | f | STG_SCHEMA | tab1 | column40 | int | 8 | | f | f | STG_SCHEMA | tab1 | Rec_Insert_DTm | timestamptz | 8 | now() | f | f | (41 rows)As Marco mentioned above, I think your column lengths (varchar 65k) probably influence the most.
Please consider the following:
1. If you are loading a table with many columns including long varchars change the LoadMergeChunkSizeK config parameter.
It turns out to have impact on the copy/load performance during the sorting phase.
Change the LoadMergeChunkSizeK parameter as an exception for specific wide tables load.
Default value (2048) may be too small for tables with a large # of columns and lots of varchar columns.
Do one test with 20480 --> SELECT SET_CONFIG_PARAMETER('LoadMergeChunkSizeK',20480);
And another with 204800 --> SELECT SET_CONFIG_PARAMETER('LoadMergeChunkSizeK',204800);
One of the reasons for a slow load is the amount of columns In the projection ORDER BY clause.
Recreate your schema with NO “SEGMENTED BY” clause and with only one INT field in the ORDER BY clause.
Because by default Vertica creates a projection with many fields in the sort order.
Split the 3M file and measure the time it takes to run 2 separated COPY commands in parallel, each loads a different file.
Do you load many empty values with the wide varchars?
If yes, consider to remove all whitespace (denoted by [:space:] in tr):
cat file_name | tr -d '[:space:]'
Please verify that EnableCooperativeParse configuration parameter is set to 1:
select * from vs_configuration_parameters where parameter_name = 'EnableCooperativeParse';
More info how to distribute the load can be found here:
https://www.vertica.com/docs/latest/HTML/Content/Authoring/DataLoad/UsingParallelLoadStreams.htm
#!/bin/bash vsql -X -A -q -c "profile COPY STG_SCHEMA.tab1 (col1, col2, col3, ..., colN) FROM '/home/dbadmin/test_2m.csv' DELIMITER E'\t' NULL '' STREAM NAME 'MY_COPY_01' SKIP 1 ABORT ON ERROR;" > log_file 2>&1 TRAN_ID=$(grep HINT log_file | cut -d'=' -f2 | cut -d' ' -f 1) STAT_ID=$(grep HINT log_file | cut -d'=' -f3 | cut -d';' -f 1) vsql -Xec "SELECT * FROM load_streams where transaction_id = $TRAN_ID AND statement_id = $STAT_ID ;" echo 'Find out which execution engine operator took the longest to execute:' vsql -Xec "SELECT operator_name, AVG(counter_value) as Average_Time_Per_Thread, MAX(counter_value) as Max_Time_Per_Thread FROM dc_execution_engine_profiles WHERE counter_name ILIKE '%execution time%' AND transaction_id = $TRAN_ID AND statement_id = $STAT_ID GROUP BY 1 ORDER BY Average_Time_Per_Thread DESC;" echo 'Exec_time and I/O by node:' vsql -Xec "SELECT node_name, path_id, activity, TIMESTAMPDIFF( us , start_time, end_time) AS elaps_us, execution_time_us AS exec_us, CASE WHEN associated_oid IS NOT NULL THEN description ELSE NULL END AS input FROM v_internal.dc_plan_activities WHERE transaction_id = $TRAN_ID AND statement_id = $STAT_ID ORDER BY elaps_us DESC;" echo 'Threads by node:' vsql -Xec "SELECT node_name, path_id, operator_name, activity_id::VARCHAR || ',' || baseplan_id::VARCHAR || ',' || localplan_id::VARCHAR AS abl_id, COUNT(DISTINCT(operator_id)) AS '#Threads' FROM v_monitor.execution_engine_profiles WHERE transaction_id = $TRAN_ID AND statement_id = $STAT_ID GROUP BY 1,2,3,4 ORDER BY 1,2,3,4;"Thanks, I will try it out later on.
Even if I have varchar data maximum 1000 char length - even then this column size matter..? (regardless if there is actual data 1000char vs 50k char ?)
But I tried this first parameter - LoadMergeChunkSizeK
And already I can see major improvements..
What is your recommendation regarding LoadMergeChunkSizeK parameter, should I increase it even more and if also what would be the upper limit (of which I should not exceed)..?
And what would be the downside of increasing this parameter..? (I mean will there be some other things that are performing poorly due to this parameter having bigger value?)
Also, have I understood correctly, that WOS is removed from 10.0 and all data is being written to ROS instead..?
So the DIRECT keyword does not have any impact any more..? (while testing, I got the same results with or without the DIRECT keyword)
Thanks
Raul
For best performance try to use data types which reflects your real data length.
For the most common use cases, default settings provide a good balance between the resources usage and the load elapsed time.
In rare cases when many very wide fields are required, measure the time it takes and select the best setting which fits your need.
More info about load tuning can be found here:
https://www.vertica.com/kb/Data-Loading-in-Vertica-Using-COPY/Content/BestPractices/Data-Loading-in-Vertica-Using-COPY.htm
Yes, in Vertica 10.0, WOS (Write Optimized Store) was deprecated, because today many of the original System limitation that lead to the creation of WOS do not exist anymore. Prior to Vertica 9.3, by default, Vertica initially loads data into WOS.
For databases created in version 9.3 and later, Vertica now uses a default load type of DIRECT.
This setting loads data directly to ROS, bypassing WOS.
So should I increase LoadMergeChunkSizeK value only for large table loading..?
And while loading smaller tables, I should set it back to default, which is 2048 ?
Or what is the downside, when leaving this parameter 20480 or 204800 (even for smaller tablles)?
This "SEGMENT BY" comes for "create projection" statement correct..?
I actually did not create projection manually, I just created staging table, and by that the projection was created by default.
SELECT EXPORT_OBJECTS('','STG_SCHEMA.tab1',FALSE);And the output for projection as follows (done some replacing to improve readability).
CREATE PROJECTION STG_SCHEMA.tab1_super /*+basename(tab1),createtype(L)*/ ( .... ) AS SELECT ...... FROM STG_SCHEMA.tab1 ORDER BY tab1.id, --> BIGINT column tab1.column2, --> BIGINT column tab1.column3, --> BIGINT column tab1.column4, --> VARCHAR(65000) column tab1.column5, --> VARCHAR(65000) column tab1.column6, --> VARCHAR(65000) column tab1.column7, --> TIMESTAMP WITH TIME ZONE column tab1.column8 --> TIMESTAMP WITH TIME ZONE column SEGMENTED BY hash(tab1.id, --> BIGINT column tab1.column2, --> BIGINT column tab1.column3, --> BIGINT column tab1.column7, --> TIMESTAMP WITH TIME ZONE column tab1.column8, --> TIMESTAMP WITH TIME ZONE column tab1.column9, --> TIMESTAMP WITH TIME ZONE column tab1.column10, --> TIMESTAMP WITH TIME ZONE column tab1.column11, --> TIMESTAMP WITH TIME ZONE column ) ALL NODES OFFSET 0;I mean how I can create the desired projection, and what it would look like..?
(at the moment create table statement created this projection automatically)
Thanks
Raul
The hash‑segmentation‑clause specifies how to segment and distribute the data evenly across cluster nodes.
It doesn't have much influence in your case when there is just one node in the cluster.
For more info on DBD see: https://www.vertica.com/docs/latest/HTML/Content/Authoring/GettingStartedGuide/UsingDatabaseDeisgner/UsingDatabaseDesignerto.htm
Before I created table like this:
CREATE TABLE STG_SCHEMA.tab1 (.....);Well, now I created table like this:
CREATE TABLE STG_SCHEMA.tab1 (.....) ORDER BY id;While adding "order by id", I got rid of projection sort, but I still got segmentation by hash.
Projection looks like this now (it was created automatically, when I created table). How can I get rid of that "SEGMENTED BY hash(" ?
Note that projection type is now P (looks like this order by clause changed that)
CREATE PROJECTION STG_SCHEMA.tab1_super /*+basename(tab1),createtype(P)*/ ( .... ) AS SELECT ...... FROM STG_SCHEMA.tab1 ORDER BY tab1.id --> BIGINT column SEGMENTED BY hash(tab1.id, --> BIGINT column tab1.column2, --> BIGINT column tab1.column3, --> BIGINT column tab1.column7, --> TIMESTAMP WITH TIME ZONE column tab1.column8, --> TIMESTAMP WITH TIME ZONE column tab1.column9, --> TIMESTAMP WITH TIME ZONE column tab1.column10, --> TIMESTAMP WITH TIME ZONE column tab1.column11, --> TIMESTAMP WITH TIME ZONE column ) ALL NODES OFFSET 0;Thanks
Raul
At first I created table like this:
CREATE TABLE STG_SCHEMA.tab1 (.....);Well, now I created table like this:
CREATE TABLE STG_SCHEMA.tab1 (.....) ORDER BY id;While adding "order by id", I got rid of projection sort, but I still got segmentation by hash.
Projection looks like this now (it was created automatically, when I created table). How can I get rid of that "SEGMENTED BY hash(" ?
Note that projection type is now P (looks like this order by clause changed that)
CREATE PROJECTION STG_SCHEMA.tab1_super /*+basename(tab1),createtype(P)*/ ( .... ) AS SELECT ...... FROM STG_SCHEMA.tab1 ORDER BY tab1.id --> BIGINT column SEGMENTED BY hash(tab1.id, --> BIGINT column tab1.column2, --> BIGINT column tab1.column3, --> BIGINT column tab1.column7, --> TIMESTAMP WITH TIME ZONE column tab1.column8, --> TIMESTAMP WITH TIME ZONE column tab1.column9, --> TIMESTAMP WITH TIME ZONE column tab1.column10, --> TIMESTAMP WITH TIME ZONE column tab1.column11, --> TIMESTAMP WITH TIME ZONE column ) ALL NODES OFFSET 0;Thanks
Raul
@raulk89 -
Fyi...
P - CREATE TABLE WITH PROJ CLAUSE
https://www.vertica.com/blog/vertica-quick-tip-projection-create-types/
A projection on a columnar table is either segmented or unsegmented (replicated).
https://www.vertica.com/docs/10.0.x/HTML/Content/Authoring/ConceptsGuide/Components/ProjectionSegmentation.htm
Is just ID a good candidate key to segment by?
https://www.vertica.com/blog/determining-candidate-segmentation-keys-quick-tip/
I never said I need to segment by ID field.
I understand that I probably need to have my tables UNSEGMENTED, since I have single node. Am I right here..?
I do not see possibility that schema can be created with “SEGMENTED BY” clause.
How can I create table without segmentation..?
Thanks
Raul
The projection definition would need to specify "UNSEGMENTED ALL NODES KSAFE 1" instead of the current "SEGMENTED BY ..." syntax.
I don't know if it will change the performance on a single node cluster, however.
So I need to manually create projection to get this UNSEGMENTED projection ?
So first:
create table statement:
CREATE TABLE STG_SCHEMA.tab1 (.....);and then create projection statement;
Am I correct..?
Thanks
Raul
You can specify those directives in your CREATE TABLE statement.
CREATE TABLE STG_SCHEMA.tab1 (...)
ORDER BY tab1.id
UNSEGMENTED ALL NODES;
The K-Safety of a single node cluster is 0.
Per @Vertica_Curtis 's suggestion, here is an example:
dbadmin=> SELECT node_count, designed_fault_tolerance, current_fault_tolerance FROM system; node_count | designed_fault_tolerance | current_fault_tolerance ------------+--------------------------+------------------------- 1 | 0 | 0 (1 row) dbadmin=> CREATE TABLE zero (c INT) ORDER BY c UNSEGMENTED ALL NODES KSAFE 0; CREATE TABLE dbadmin=> SELECT export_objects('', 'zero'); export_objects ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- CREATE TABLE public.zero ( c int ); CREATE PROJECTION public.zero ( c ) AS SELECT zero.c FROM public.zero ORDER BY zero.c UNSEGMENTED ALL NODES; SELECT MARK_DESIGN_KSAFE(0); (1 row)Ok, thanks.
SELECT EXPORT_OBJECTS('','STG_SCHEMA.tab1',FALSE); CREATE TABLE STG_SCHEMA.tab1 (...); CREATE PROJECTION STG_SCHEMA.tab1 --> Note here, it says I have projection named tab1 (same as table) (.....) AS SELECT ... FROM STG_SCHEMA.tab1 ORDER BY id UNSEGMENTED ALL NODES;But before it was named tab1_super, when I do explain, it shows that it still accesses tab1_super
Does it mean that there I have actually 2 projections created (named: tab1 and tab1_super when specified ("order by id unsegmented all nodes ksafe 0")..?
dbadmin=> explain select id from STG_SCHEMA.tab1; QUERY PLAN ------------------------------------------------------------------- ------------------------------ QUERY PLAN DESCRIPTION: ------------------------------ explain select id from STG_SCHEMA.tab1; Access Path: +-STORAGE ACCESS for tab1 [Cost: 22K, Rows: 5M] (PATH ID: 1) | Projection: STG_SCHEMA.tab1_super | Materialize: tab1.id dbadmin=> select * from v_monitor.projection_storage where anchor_table_name = 'tab1' order by projection_schema, projection_name; -[ RECORD 1 ]-----------+--------------------- node_name | v_jaakdb_node0001 projection_id | 45035996273715528 projection_name | tab1_super projection_schema | STG_SCHEMA projection_column_count | 42 row_count | 4725998 used_bytes | 250577513 wos_row_count | 0 wos_used_bytes | 0 ros_row_count | 4725998 ros_used_bytes | 250577513 ros_count | 1 anchor_table_name | tab1 anchor_table_schema | STG_SCHEMA anchor_table_id | 45035996273715526Raul
Hi Raul,
did you run vioperf on your node? it wd be good to understand the speed of your disks
to sum up,
1. check your disks to see what can be expected from them
2. check how many projections you have per table
3. create unsegmented projection with just a few cols in order by
4. reduce the length of 65K varchar cols to the real ones
5. split your file into several parts and run concurrent COPYs - here your shd experiment and find the best concurrency.
hope this helps.
But what about my last question regarding projections.
I formated it as code. Not very good visually, but if you copy it to the notepad++ for example, then it is much easier to understand.
[dbadmin@vhost ~]$ vioperf --duration="1 minutes" /var/lib/vertica/ The minimum required I/O is 20 MB/s read and write per physical processor core on each node, in full duplex i.e. reading and writing at this rate simultaneously, concurrently on all nodes of the cluster. The recommended I/O is 40 MB/s per physical core on each node. For example, the I/O rate for a server node with 2 hyper-threaded six-core CPUs is 240 MB/s required minimum, 480 MB/s recommended. Using direct io (buffer size=1048576, alignment=512) for directory "/var/lib/vertica" test | directory | counter name | counter value | counter value (10 sec avg) | counter value/core | counter value/core (10 sec avg) | thread count | %CPU | %IO Wait | elapsed time (s)| remaining time (s) -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Write | /var/lib/vertica | MB/s | 700 | 700 | 175 | 175 | 4 | 52 | 40 | 10 | 5 Write | /var/lib/vertica | MB/s | 653 | 557 | 163.25 | 139.25 | 4 | 45 | 45 | 15 | 0 ReWrite | /var/lib/vertica | (MB-read+MB-write)/s| 476+476 | 476+476 | 119+119 | 119+119 | 4 | 41 | 57 | 10 | 5 ReWrite | /var/lib/vertica | (MB-read+MB-write)/s| 462+462 | 434+434 | 115.5+115.5 | 108.5+108.5 | 4 | 40 | 58 | 15 | 0 Read | /var/lib/vertica | MB/s | 906 | 906 | 226.5 | 226.5 | 4 | 62 | 37 | 10 | 5 Read | /var/lib/vertica | MB/s | 916 | 938 | 229 | 234.5 | 4 | 63 | 35 | 15 | 0 SkipRead | /var/lib/vertica | seeks/s | 21672 | 21672 | 5418 | 5418 | 4 | 10 | 88 | 10 | 5 SkipRead | /var/lib/vertica | seeks/s | 21739 | 21877 | 5434.75 | 5469.25 | 4 | 11 | 87 | 15 | 0 [dbadmin@vhost ~]$ vcpuperf Compiled with: 7.3.1 20180303 (Red Hat 7.3.1-5) Expected time on Core 2, 2.53GHz: ~9.5s Expected time on Nehalem, 2.67GHz: ~9.0s Expected time on Xeon 5670, 2.93GHz: ~8.0s This machine's time: CPU Time: 7.580000s Real Time:7.590000s Some machines automatically throttle the CPU to save power. This test can be done in <100 microseconds (60-70 on Xeon 5670, 2.93GHz). Low load times much larger than 100-200us or much larger than the corresponding high load time indicate low-load throttling, which can adversely affect small query / concurrent performance. This machine's high load time: 122 microseconds. This machine's low load time: 582 microseconds.Thanks!
here is your limitation: 476 mb/s overall reading and writing in parallel. Just keep it in mind.
You have CPU scaling on:
*This machine's high load time: 122 microseconds.
*This machine's low load time: 582 microseconds.
Low load time shd not be much higher than high load. If you want to achieve better performance, work on it.
Re projections, pls, send the output of this: SELECT EXPORT_OBJECTS('/home/dbadmin/<outfile.sql>',',FALSE);
After you create a new projection, run select refresh('<>your_table_name'); - to populate it with the data.
After that run drop projection ... ; for the one you do not need any longer.