Tuesday, 19 July 2011

Interfacing Talend with Amazon SDB (AWS SDB) – quick way

Hi all,

I had the following challenge : read some ftp account informations (ftp server, username, password, target directory) stored in Amazon SDB and use it in a Talend transformation, published as a web service. You know about SDB I hope. For those who don’t, SDB is a key / value database provided by Amazon. So you can name SDB a noSQL database.
I played with the SDB API from Amazon, and succeeded after some coding and “Talending”. Here is how I did.
Below is only a small part of a much larger project, composed of a large webservice collection, created for my client.
This project does the following :
  • Query into a database using dynamic params given by the user at run time from a Flex portal (a query engine like business objects !),
  • Return a rowcount of the query, into the portal. I’m working in web marketing : counting people (segments) before creating a campaign is very important …
  • Generate an extract of the data, process this extract according to the params given by the user (separator, encoding, spliting, zipping …),
  • Send this data file to different “tubes” : router, ftp, AWS S3, local download …
Here we focus on the ftp sending part, using SDB to retrieve infos.

 

The process.

 

image

The job (partial).

image

 

Data structure in AWS SDB.

I’m using a very nice firefox plugin in order to have easy and quick access to my SDB ecosystem : sdbtool. My data structure is simple ( “dd” is of course not the true value …) :
  • Item : ftp
    • Attribute names : Address :
        • Attribute value : dd
      • Attribute names :Login
        • Attribute value : dd
      • Attribute names : PKey
        • Attribute value : dd
      • Attribute names : Password
        • Attribute value : dd
      • Attribute names : Port
        • Attribute value : 21
Here is a screencap of my sdbtool view :
image

Explanations.

First, we load all the needed libraries, using tlibraryload component.
  • aws-java-sdk-1.0.14.jar
  • commons-codec-1.3.jar
  • commons-httpclient-3.0.1.jar
  • commons-logging-1.1.1.jar
  • jackson-core-asl-1.4.3.jar
  • stax-api-1.0.1.jar
  • stax-1.2.0.jar
For the aws-java-sdk-1.0.14.jar import, I had to write some imports. These imports are required to be able to use the aws jdk.
image
Then we have a tRowGenerator in which I create the value for the variable myDomain, that will be used in SDB queries (see code below). You can avoid this step, I created it only for quick testing purpose.
Then, we have to code a little tJavarow. This java code will :
  • connect to AWS SDB. You must have an account for AWS SDB.
  • run several queries, using SQL, to retrieve ftp account informations :
    • ftp server address
    • ftp server login
    • ftp server pass
    • ftp server port
    • ftp server pkey, if needed
  • store the query results in output_row.[name] so they can be used in Talend process.
The code in the tJavaRow. First we create some credentials (use yours) and then create an endpoint with sdb address from AWS : https://sdb.eu-west-1.amazonaws.com. Be carefull to set a valid endpoint, using a valid country zone.
The code is finally simple : create a string containing your sql query, then call a getItems() function. An Item is sent back, simply call a getAttribute in order to retrieve the value you need.
I chose, for simplicity, to run a different query for each item I need from SDB. Of course, you can write it shortly.
  
        BasicAWSCredentials credentials = new BasicAWSCredentials("KL45LKJ4325MLKJ2345", "LKJ45LKJmlkjdlkjGRhjKLJSFSDG432534");    
        
    final String[] FTP_Items;
    
        

        AmazonSimpleDB sdb = new AmazonSimpleDBClient(credentials);
        sdb.setEndpoint("https://sdb.eu-west-1.amazonaws.com");
        try {
        int i = 0;
            String myDomain = "Clients"; 
            String selectExpression = "select FTP_Address from `" + myDomain + "`where code_client = '" + context.client_name +"'";
           
            SelectRequest selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Address = attribute.getValue().toString();
                }
            }         
            selectExpression = "select FTP_Login from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Login = attribute.getValue().toString();
                }
            }
            selectExpression = "select FTP_Pass from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Pass = attribute.getValue().toString();
                }
            }
            selectExpression = "select FTP_Port from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_Port = Integer.valueOf(attribute.getValue());
                }
            }
            selectExpression = "select FTP_PKey from `" + myDomain + "`where code_client = '" + context.client_name +"'";
            selectRequest = new SelectRequest(selectExpression);
            for (Item item : sdb.select(selectRequest).getItems()) {
                for (Attribute attribute : item.getAttributes()) {
                    output_row.FTP_PKey = attribute.getValue().toString();
                }
            }
        } catch (AmazonServiceException ase) {
            System.out.println("AWSException");
            System.out.println("ErrorMsg:    " + ase.getMessage());
            System.out.println("HTTPStatcode: " + ase.getStatusCode());
            System.out.println("AWS Errcode:   " + ase.getErrorCode());
            System.out.println("Errortype:       " + ase.getErrorType());
            System.out.println("RequestID:       " + ase.getRequestId());
        } catch (AmazonClientException ace) {
            System.out.println("AWSClientException");
            System.out.println("Error Message: " + ace.getMessage());


Final.

After retrieving all the item I need for sending on ftp (server, username, pass, port or location for ssh key), I store all this into global variables. Then, these global variables are used as arguments into two very customized scripts (needed in my case) that will send the files : simple ftp or sftp when needed. Finally, I catch some usefull infos from the custom ftp scripts, process it into a tmap and send this information into a tBufferOutput step. That way, I can provide a soap feed back when calling this webservice.

This post is very consice, feel free to ask me for more infos about this process.

Links.

3 comments:

Anonymous said...

Hi there, I need to do something similar to this between Talend and Redis - a KVS or NoSQL DB. Were you using SOAP/REST as the main integration point or have you issued socket-lik commands to the KVS at all? If so, how did you do it?

Achmax said...

My exams are starting In 2012 mid session. thank you for these information.

rintu said...

This is really an excellent Blog ; and very informative information there - I've read. Really very useful all of them. I like them a lot. Hope - we'll get more this type of information in future days. Thanks.

For any kind of Web Designers Sydney - contact with me...