Modifying the Custom FileSource UDX Example

I modified the custom UDX example which comes with vertica so that instead of reading from a file it reads from Mysql.

 

this code is reading from a table which has two columns: create table foo(id int primary key, name varchar(100)). it has two rows (1, 'test1'), (2, 'test2')

 

    public StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException {
long offset;
srvInterface.log("total size of buffer " + output.buf.length);
StringBuilder builder = new StringBuilder();
try {
if (rs.next()) {
for(int i = 1 ; i <= resultSetColumnCount; i++) {
builder.append(rs.getString(i));
if (i < resultSetColumnCount) {
builder.append("|");
}
}

String row = builder.toString();
srvInterface.log("got this row from db: " + row);
byte[] bytes = row.getBytes();
System.arraycopy(bytes, 0, output.buf, 0, bytes.length);
output.offset = bytes.length;
srvInterface.log("current value of offset " + output.offset);
srvInterface.log("current value of buffer " + output.buf);
return StreamState.OUTPUT_NEEDED;
} else {
srvInterface.log("came inside the function but there is no data in resultset");
return StreamState.DONE;
}
}
catch(SQLException sqlEx) {
throw new UdfException(0, sqlEx.getMessage(), sqlEx);
}
}

The code executes perfectly and in the UDXLog i can see the following messages

 

2016-01-06 00:08:23.019 [Java-6610] 0x19 [UserMessage] MySqlSource - going to execute query: select * from Foo
2016-01-06 00:08:23.020 [Java-6610] 0x19 [UserMessage] MySqlSource - number of columns in the results: 2
2016-01-06 00:08:23.021 [Java-6610] 0x19 [UserMessage] MySqlSource - total size of buffer 1048576
2016-01-06 00:08:23.022 [Java-6610] 0x19 [UserMessage] MySqlSource - got this row from db: 1|test1
2016-01-06 00:08:23.022 [Java-6610] 0x19 [UserMessage] MySqlSource - current value of offset 10
2016-01-06 00:08:23.022 [Java-6610] 0x19 [UserMessage] MySqlSource - current value of buffer [B@25eab8a7
2016-01-06 00:08:23.023 [Java-6610] 0x19 [UserMessage] MySqlSource - total size of buffer 1048576
2016-01-06 00:08:23.023 [Java-6610] 0x19 [UserMessage] MySqlSource - got this row from db: 2|test2
2016-01-06 00:08:23.024 [Java-6610] 0x19 [UserMessage] MySqlSource - current value of offset 10
2016-01-06 00:08:23.024 [Java-6610] 0x19 [UserMessage] MySqlSource - current value of buffer [B@25eab8a7
2016-01-06 00:08:23.025 [Java-6610] 0x19 [UserMessage] MySqlSource - total size of buffer 1048576
2016-01-06 00:08:23.026 [Java-6610] 0x19 [UserMessage] MySqlSource - came inside the function but there is no data in resultset
2016-01-06 00:08:23.027 [Java-6610] 0x19 [UserMessage] MySqlSource - came inside destry
2016-01-06 00:08:23.027 [Java-6610] 0x19 [UserMessage] MySqlSource - closed all resources successfully

so it looks like that the code is working perfectly. it is called twice and both the times it gets the right row

 

but on the Vertica side it inserts 5 rows !!!!

 

vertica=> copy testing.Foo source MySqlSource(mysqlconnectionstring='jdbc:mysql://mysql:3306/test', tableName='Foo', username='foo', password='bar');
Rows Loaded
-------------
5
(1 row)

vertica=> select * from testing.Foo;
id | name
----+----------
2 | test2
2 | test2
2 | test2
2 | test2
2 | test2
(5 rows)

Why did it

 

1. Loose the first row (1, 'test1')?

2. why did it insert the 2nd row twice?

Comments

  • Hi!

     

    Topic "opening connection in UDFx" already discussed and its a very bad idea because:

    1. Vertica will try to parallelize query and so connection objects too
    2. Several nodes will open several connection objects(nodes are independant)

    Check num of connections to MySQL.

     

    PS

    You should use in ODBCLoader.

  • Thank you for your reply. I will defintely look at ODBC Loader.

     

    However, if parallel connections lead to duplicate data, then how is vertica connectors pulling data from hadoop, odbc or any other shared data source?

     

    afterall the problem of dulicate data will arise if all vertica nodes try to read the same HDFS file?

     

    so just want to know how I can read data from a source which is shared accross all vertica nodes.

  • I tried to install the ODBC Loader but it is very hard and problematic. here is the problem I faced

     

    https://community.dev.hpe.com/t5/Vertica-Forum/Installing-ODBC-Loader-on-Vertica/m-p/234283#U234283

     

    Could you please tell me a way in which I can stop vertica from using parallel connections to mysql and use only 1 node for import. it will be slow but atleast it will work. The ODBC approach is so problematic ... it just won't install.

     

     

  • Hi!

    Actually my previous answer isn't full(sorry). You can do it with UDL functions, but you didn't modified it correctly. You modified Vertica`s example based on random access file
    reader = new RandomAccessFile(new File(filename), "r");
    and that why you have "offset"
    output.offset = bytes.length;
    With random access to file you have to know offset(index), but from MySQL you read data sequentially, you can't read MySQL data with random access. The RandomAccessFile is designed for random access of binary data. i.e. you can access anywhere in the file by index(sorry but I still can't give you a full answer because you posted only an irrelevant part of code, <MySqlSourceFactory.java> much more important).
     
    Sorry my English isn't good, may be someone else will explain better why you can't read from MySQL with random access.
     
    >> Could you please tell me a way in which I can stop vertica from using parallel connections to mysql and use only 1 node for import...
     
    There are 2 options:
    1. HINT: with java method findExecutionNodes
    2. With special SQL hint(isn't recommended) --- /*+ skipNode(<node name>)*/ or /*+ skipSite(<node name>)*/


    For example the following query will not use v_vdb_node0002 and v_vdb_node0003 nodes:

    SELECT /*+ skipNode(v_vdb_node0002),skipNode(v_vdb_node0003)*/ 1 FROM DUAL;

     

    ;)

     

  • OK. I am Happy to read Sequentially from MySQL so long as I can read everything and data is not duplicate

     

    Here is my entire code

     

    https://github.com/abhitechdojo/VerticaMySQLUDX

     

    Now can you tell me how can I remove the random access and read the data sequentially?

  • Hi!

     

    >> Here is my entire code

    Ok, I will take a look.

     

    >> Now can you tell me how can I remove the random access and read the data sequentially?

    I will try, but before can you say how many nodes in your cluster?

    (wait a little, I still didn't read your code).

     

     

  • I have 4 nodes in my vertica cluster.

  • Hi!

     

    >> I have 4 nodes in my vertica cluster.

    Thnx(its strange - 5 rows inserted, so somewhere a problem with offset).

     

     

    Lets continue and will do it step by step. For beginning change in factory methods plan and getParameterType(you can remove method findExecutionNodes):

     

     

        // Factory
    @Override
    public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt) throws UdfException {
    List<String> executionNodes = new ArrayList<String>();
    executionNodes.add(srvInterface.getCurrentNodeName());
    planCtxt.setTargetNodes(executionNodes);
    }

    @Override
    public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes) {
    parameterTypes.addVarchar(64000, "mySqlConnectionString");
    parameterTypes.addVarchar(64000, "userName");
    parameterTypes.addVarchar(64000, "password");
    parameterTypes.addVarchar(64000, "tableName");
    //parameterTypes.addVarchar(65000, "nodes");
    }

     

    after it test it and report a progress(check my code for errors, I'm human too;)).

     

    PS

    You forgot planCtxt.setTargetNodes(or Im misiing something?)

     

  • Hi,

     

    I changed the factory method and did indeed solve the problem of duplicate rows.

     

    However it is still skipping rows. My source table in mysql has two rows 1, testing1 and 2, testing2

     

    but when I run my copy command it copies only 1 row (2, testing2) from mysql. it skips the first row

     

    I have commited my new code to github you can have a look at the existing code

     

    https://github.com/abhitechdojo/VerticaMySQLUDX.git

  • Hi!

     

    >> indeed solve the problem of duplicate rows.

    It's good to hear.

     

    >> but when I run my copy command it copies only 1 row (2, testing2) from mysql

    • Is it 1 row or 2 identical rows(in Vertica)?
    • So far Im trying to understand why size(bytes("1|test1")) = 10 (syntax simplified)

    From your post above:

    [...] MySqlSource - got this row from db: 1|test1
    [...] MySqlSource - current value of offset 10

    I will return a little later, I need to do some research.

  • Hi!

     

    Are you still here? I think I solved it, but you will need to test it.

     

    Here is a presentation: https://youtu.be/77eXm1DaihU

    Here is a code(changes are highlighted):

                if (rs.next()) {
    for(int i = 1 ; i <= resultSetColumnCount; i++) {
    builder.append(rs.getString(i));
    if (i < resultSetColumnCount) {
    builder.append("|");
    }
    }
    builder.append("\n");
    String row = builder.toString();
    byte[] bytes = row.getBytes();
    System.arraycopy(bytes, 0, output.buf, output.offset, bytes.length);
    output.offset += bytes.length;
    return StreamState.OUTPUT_NEEDED;
    }

     

  • yes indeed it works. I have checked in my working code on the github repository. 

     

    Maybe it will be useful for someone else who is going down this path.

     

    Thanks a lot for your help. I really appriciate it

Leave a Comment

BoldItalicStrikethroughOrdered listUnordered list
Emoji
Image
Align leftAlign centerAlign rightToggle HTML viewToggle full pageToggle lights
Drop image/file