Tuesday, 19 July 2011

Interfacing Talend with Amazon SDB (AWS SDB) – quick way

Hi all,

I had the following challenge : read some ftp account informations (ftp server, username, password, target directory) stored in Amazon SDB and use it in a Talend transformation, published as a web service. You know about SDB I hope. For those who don’t, SDB is a key / value database provided by Amazon. So you can name SDB a noSQL database.
I played with the SDB API from Amazon, and succeeded after some coding and “Talending”. Here is how I did.
Below is only a small part of a much larger project, composed of a large webservice collection, created for my client.
This project does the following :
  • Query into a database using dynamic params given by the user at run time from a Flex portal (a query engine like business objects !),
  • Return a rowcount of the query, into the portal. I’m working in web marketing : counting people (segments) before creating a campaign is very important …
  • Generate an extract of the data, process this extract according to the params given by the user (separator, encoding, spliting, zipping …),
  • Send this data file to different “tubes” : router, ftp, AWS S3, local download …
Here we focus on the ftp sending part, using SDB to retrieve infos.

 

The process.

 

image

The job (partial).

image

 

Data structure in AWS SDB.

I’m using a very nice firefox plugin in order to have easy and quick access to my SDB ecosystem : sdbtool. My data structure is simple ( “dd” is of course not the true value …) :
  • Item : ftp
    • Attribute names : Address :
        • Attribute value : dd
      • Attribute names :Login
        • Attribute value : dd
      • Attribute names : PKey
        • Attribute value : dd
      • Attribute names : Password
        • Attribute value : dd
      • Attribute names : Port
        • Attribute value : 21
Here is a screencap of my sdbtool view :
image

Explanations.

First, we load all the needed libraries, using tlibraryload component.
  • aws-java-sdk-1.0.14.jar
  • commons-codec-1.3.jar
  • commons-httpclient-3.0.1.jar
  • commons-logging-1.1.1.jar
  • jackson-core-asl-1.4.3.jar
  • stax-api-1.0.1.jar
  • stax-1.2.0.jar
For the aws-java-sdk-1.0.14.jar import, I had to write some imports. These imports are required to be able to use the aws jdk.
image
Then we have a tRowGenerator in which I create the value for the variable myDomain, that will be used in SDB queries (see code below). You can avoid this step, I created it only for quick testing purpose.
Then, we have to code a little tJavarow. This java code will :
  • connect to AWS SDB. You must have an account for AWS SDB.
  • run several queries, using SQL, to retrieve ftp account informations :
    • ftp server address
    • ftp server login
    • ftp server pass
    • ftp server port
    • ftp server pkey, if needed
  • store the query results in output_row.[name] so they can be used in Talend process.
The code in the tJavaRow. First we create some credentials (use yours) and then create an endpoint with sdb address from AWS : https://sdb.eu-west-1.amazonaws.com. Be carefull to set a valid endpoint, using a valid country zone.
The code is finally simple : create a string containing your sql query, then call a getItems() function. An Item is sent back, simply call a getAttribute in order to retrieve the value you need.
I chose, for simplicity, to run a different query for each item I need from SDB. Of course, you can write it shortly.
  
        BasicAWSCredentials credentials = new BasicAWSCredentials("KL45LKJ4325MLKJ2345", "LKJ45LKJmlkjdlkjGRhjKLJSFSDG432534");    
        
    final String[] FTP_Items;
    
        

        AmazonSimpleDB sdb = new AmazonSimpleDBClient(credentials);
        sdb.setEndpoint("https://sdb.eu-west-1.amazonaws.com");
        try {
        int i = 0;
            String myDomain = "Clients"; 
            String selectExpression = "select FTP_Address from `" + myDomain + "`where code_client = '" + context.client_name +"'";
           
            SelectRequest selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Address = attribute.getValue().toString();
                }
            }         
            selectExpression = "select FTP_Login from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Login = attribute.getValue().toString();
                }
            }
            selectExpression = "select FTP_Pass from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Pass = attribute.getValue().toString();
                }
            }
            selectExpression = "select FTP_Port from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Port = Integer.valueOf(attribute.getValue());
                }
            }
            selectExpression = "select FTP_PKey from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_PKey = attribute.getValue().toString();
                }
            }
        } catch (AmazonServiceException ase) {
            System.out.println("AWSException");
            System.out.println("ErrorMsg:    " + ase.getMessage());
            System.out.println("HTTPStatcode: " + ase.getStatusCode());
            System.out.println("AWS Errcode:   " + ase.getErrorCode());
            System.out.println("Errortype:       " + ase.getErrorType());
            System.out.println("RequestID:       " + ase.getRequestId());
        } catch (AmazonClientException ace) {
            System.out.println("AWSClientException");
            System.out.println("Error Message: " + ace.getMessage());


Final.

After retrieving all the item I need for sending on ftp (server, username, pass, port or location for ssh key), I store all this into global variables. Then, these global variables are used as arguments into two very customized scripts (needed in my case) that will send the files : simple ftp or sftp when needed. Finally, I catch some usefull infos from the custom ftp scripts, process it into a tmap and send this information into a tBufferOutput step. That way, I can provide a soap feed back when calling this webservice.

This post is very consice, feel free to ask me for more infos about this process.

Links.

2 commentaires:

Anonymous said...

Hi there, I need to do something similar to this between Talend and Redis - a KVS or NoSQL DB. Were you using SOAP/REST as the main integration point or have you issued socket-lik commands to the KVS at all? If so, how did you do it?

Achmax said...

My exams are starting In 2012 mid session. thank you for these information.