Friday, 11 October 2013

Database feeding from S3 using Talend and Amazon SQS


Here is an article relating a custom data feeding job I had recently had to develop for a price comparator. This article will show you how to set up a generic version of the job.
The challenge is simple : load multiple files from S3 into a relational database. These files are coming in mass from web crawlers (thousand of files per day) and are streamed on S3. In parallel, a message is sent on SQS Queue, asking for an integration process to start for the corresponding file.
UPDATE : For those who don’t want to read and need to put a hand on the hot stuff … you can download the job here !
UPDATE2 : I wrote this article in one shot. Please let me know if something remains unclear of if you need more informations.

Some definitions

Amazon s3
Amazon S3 means : Simple Storage Service. You can see it as a worldwide multi redundant hardrive. You can store and retrieve any amount of data. Data are simple files and are organized into buckets and directories. S3 is THE best data storing facility when working on the Amazon Cloud, but not only (you can use S3 with Azure, from your personal computer, from your smartphone etc …  You can learn more about s3 here.
Not THAT s3 …
Amazon queue
Amazon queue, called SQS (Simple Queue System) is a bit more complex. It is a message queue service : any application can publish a message, and any other application can retrieve the message and process the data. This is a very convenient message bus that can help you building loosely coupled applications with Amazon. You can learn more here.
This is NOT the Amazon Queue (SQS)

In this example, we will use these components :
  • S3 : source data repository
  • SQS : for each file waiting in S3, we have a message in the queue, holding the s3 address for the file + pathname.

The whole process

We have 3 main streams :
  • Crawling streams (will continue on calling them other streams)
  • Metadata streams : SQS messages, file names, date …
  • Data streams : data files
Here is a schema about the data harvesting, processing and loading process.
image

The process in detail

Ok, let’s go. Our process will :
  1. Connect to SQS,
  2. Wait for messages to come in the queue
  3. Process messages :
    1. Read a message
    2. Validate the structure
    3. Fill variables
    4. Start a child job that will process the data loading
  4. In case of success, clean SQS queue
  5. Process the next message (back to 3.1) … because, of course, we want a loop in order to process a message/file collection !

Messages with AWS SQS

The messages we need to trigger the job are pretty simple : we need a date and a filename, assuming our job knows where to get the file on s3 (knowing the s3 endpoint, like : s3://my_bucket/my_dir/my_data_file.txt
Our messages are simple, here is an example for one data file :
{
    "date": "20131010",
    "file": "20131010101108305_my_data_file1.csv"
}

Of course, with little work, you could create and manage messages having more than one file, by instance :

{
    "date": "20131010",
    "file": "20131010101108305_my_data_file1.csv"
    "file2": "20131010101108305_my_data_file2.csv"
}

The job in Talend (TOS)

Here is the Talend job overview. Please note : this job has been considerably detailed in order to give you the global idea and logic. Useless to say, that in production, I’m using much more compact and optimized jobs, but the idea remains the same.


image


Breaking down the job



Here is a full description of what I consider as the main parts of this job.




Loading the context.
I love working with contexts and I encourage you to do so. Using contexts will allow you to be able to switch between environments, end-points, source and target connexions. So, typically, I start the job execution with a context loading step : load the context from the context file.


image




Preparing shared connexions
As I said above, this job will implement a loop. A loop is necessary to process all the messages from the SQS queue and load all the files within one execution. And preparing shared connexions will prevent the job to open a new connexion each time a data loading is triggered. This is quite important since creating new connexions at each iteration has terrible effects : bad performance and db contention.


image


image


As you can see, I’m using the context variables in the connexion step. Nothing hard coded here. The important thing is the field surrounded with the red circle : this is the name of your connexion and will be used to call this connexion from child jobs.




Declaring global variables
Global variables are very convenient for storing job metadata (as well as storing processing data sometimes). Here I used some globalvars in order to smoothly process SQS data and easily pass variables between steps/jobs.


image


image


All these variables will be used in the next steps / components. Note : the global var “context_file” is assigned from the context variable “context_file” (given at run time).




Printing informations
Here is something I love to use : using simple tjava components in order to make my job verbose. Tha’s pretty simple, just call some System.out.println and display run time informations. This is very usefull for :

  1. Debugging : you know what is going and when.
  2. Logging : all these println will be directed to a log file, on your operating system, then you can manage these logfiles.
image


Here is the current example :

image

image

On this one, you can see I decided to use epoch as timestamp (I don’t remember why, but sure this was needed …).




Loading libraries
We really need to load some libraries before being able to run the job. These libs, for this example, are necessary to use the AWS SDK. They are  :
  • httpcore-4.2.2.jar
  • httpclient-4.2.1.jar
  • commons-logging-1.1.1.jar
  • commons-codec.jar
  • aws-java-sdk-1.4.1.jar
These libs are loaded once for each job execution (no loading when iterating the loop).

image



Let the action begin !
Well, we are approaching the really interesting part !

image

image First step : a tIfiniteLoop

Really infinite ? Noooo, you ll see later that the job has at least 2 disengaging mecanisms. Well, this loop will iterate over and over, having a “wait at each iteration (milliseconds)” constant  equals 100 (this is, in my case, the value I needed).

image Second step : a tRowGenerator

I use a tRowGenerator in order to inject the queue name into the next tjava step. Maybe something to adapt/simplify here.


image image

image Third step : a tjava to process SQS messages

This step is crucial and provides java code to handle all SQS interactions. This code does the following :
  • Initializing and checking some variables : line 1 to line 17
First dealing with starttime and nowtime, used to control the execution time. You can see the context variable “Timeout” on line 5, I used it in order to limit the job execution time : if you want your job to run for a maximum of 12 hours, simply assign the value 43200 to context.Timeout. Then, the job will disengage when reaching 12 hours.

Don’t forget to give your precious AWS credentials. Here again I used some context variables (lines 7 to 9).

Clear other variables. This is secure since this code will run at each loop iteration (line 10 to 17).
  • Connecting to SQS queue : line 19 to line 21
Simple code here, from the AWS SDK : connects and … don’t forget to specify you AWS region, otherwise this won’t work (default usa).
  • Checking if the queues has messages to process : line 23 to line 38
Line 23 to line 29, we connect to SQS and ask the “approximate number of messages”. If equals to 0, job execution is stopped.

Note : line 31 to line 38, I also clear a pid file on my serveur. A word on this : I execute this job on a Linux system and use a shell script as bootstrap. When starting the job, a pid file created. This pid file will persist as long as the job is running and will be deleted when the job stops (2 reasons : no more messages, or the context.timeout has been reached).
  • Retrieving and parsing a message : line 41 to 56
The funny part. We read the message queue and process a message. For this message, we read the body and fill the global variables with the date and the filename to process.

Remember, the global variables will be simply used by the next step : the child job.

I also read the message as a whole string (usefull for the logs) and the message handle (this will be necessary, on further steps, to delete the processed message from the queue).
  • … Giving data for the next step : the child job.
Self explanatory, after executing the java code, the execution continues on the next step.

Here is the code from the “Process_SQS_Messages” step :

long Starttime = Long.parseLong(globalMap.get("StartTime").toString());
long NowTime = System.currentTimeMillis() / 1000L;
long diff = NowTime - Starttime;

if (diff <= context.Timeout) // < to x hours 
{
String accessKeyId = context.SQS_key;
String secretKey = context.SQS_skey;
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretKey);
String Msg = "";
String date = "";
String file = "";

// Clear the global variables in order to avoid executing the same job if no new job in the queue
globalMap.put("process","");
globalMap.put("date2play","");
globalMap.put("file","");

AmazonSQS sqs = new AmazonSQSClient(credentials);
Region euRegion = Region.getRegion(Regions.EU_WEST_1);
sqs.setRegion(euRegion);

// Check if messages in queue
GetQueueAttributesRequest sqsRequest = new GetQueueAttributesRequest();
sqsRequest.withQueueUrl(input_row.SQS_queue);
sqsRequest.withAttributeNames("ApproximateNumberOfMessages");
Map<String,String> result = sqs.getQueueAttributes(sqsRequest).getAttributes();
int nb = Integer.parseInt(result.get("ApproximateNumberOfMessages"));
System.out.println("Queue has : " + nb  +" messages.");

if (nb==0) 
{
System.out.println("No more messages in queue, leaving ... bye bye.");
System.out.println("Deleting pid file");
File pidfile = new File ("/mnt/etl/talend_scripts/prod/Bootstraps/pid_files/my_pid.pid");
    if (pidfile.delete()){System.out.println("Pid file deleted");} else {System.out.println("Pid file not deleted. ALERT");}
    System.out.println("Processing done ... bye bye.");
System.exit(0);
}

// Receive message
globalMap.put("Queue",input_row.SQS_queue);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(input_row.SQS_queue);
            List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
            for (Message message : messages) {
                Msg = message.getBody();
                JSONObject obj = new JSONObject(Msg);
                file = obj.getString("file");
                date = obj.getString("date");
                System.out.println("NEW MSG : " + date + " " + file);
                    globalMap.put("Msg", Msg);
                    globalMap.put("file",file);
                    globalMap.put("date2play",date);
                    globalMap.put("MesgHandle", messages.get(0).getReceiptHandle());
            }
} else
{
System.out.println("Price integration job has run for 12 hours. Bye bye ...");
System.exit(0);
}



 image Fourth step : a childjob


After the tjava execution, we call a child job.
This child job is a simple loading job having :
  • a step for downloading the data from s3 and store on your server filesystem
  • a step for reading the file and a tmap linked to target database.
I won’t give full description of this child job since this is not the important part of this post (and most of the work of processing SQS message and distributing the values to Talend has been done).

I ll show you, in a next post, how to download files from s3 with Talend and a third party tool (way better than talend s3 objects).

For the moment, let’s have a look to the link between the previous tjava and the childjob.

image

I had to write some code in the “If” link in order to :
  • avoid execution of the childjob if no file is supplied or no filename
  • avoid new execution as long as the childjob is running
This code is pretty simple :

   1:  ((String)globalMap.get("file")) != "" && ((String)globalMap.get("Running")) == "false"

… will check if the global variable “file” is different from “” and will check if the global variable “Running” is set to false (means no other loading in process).

This childjob is taking the global variable “file” as argument. Check it out on the definition below :

image 

Cleaning SQS and releasing stream for next iteration.

Very simple, after successfull execution, we want to delete the message from the queue. That’s the goal of the first tjava called “Clean_SQS”.

image

Clean_SQS tjava code : here we delete the message we just processed. Note the println here again, really usefull for debugging.

image

Release_stream tjava code : here again, some println and setting the global variable “Running” with “false”. This action allows the next file processing.

image

1 comment:

Russell L. Magidson said...

This was very interesting to read, but obviously unable to work with. I was wondering if you've revisited this concept and may be able to share an update to accessing SQS messages?

Thanks
- rlm