Using Amazon RedShift with the AWS .NET API Part 10: RedShift in Big Data
April 16, 2015 Leave a comment
Introduction
In the previous post we discussed how to calculate the more complex parts of the aggregation script: the median and nth percentile if the URL response time.
This post will take up the Big Data thread where we left off at the end of the series on Amazon S3. We’ll also refer to what we built [at the end of the series on Elastic MapReduce]. That post took up how to run an aggregation job via the AWS .NET SDK on an available EMR cluster. Therefore the pre-requisite of following the code examples in this post is familiarity with what we discussed in those topics.
In this post our goal is to show an alternative to EMR. We’ll also see how to import the raw data source from S3 into RedShift.
Reminder
Where were we in the first place? The last post of the Amazon S3 series stopped where we stored our raw data in S3 in folders like this:
The individual files in each bucket can look like the following:
Typical plain text records will look like below:
cnn http://www.cnn.com GET 135 1418419114682 2014-12-12 21:18:34
cnn http://www.cnn.com/africa GET 1764 1418419125964 2014-12-12 21:18:45
The next task is to apply what we’ve learnt in this RedShift series and run some type of aggregation. We’ve seen all necessary ingredients in the posts so far, we’ll reuse all that.
Aggregating over customers and URLs
We’ll extend the Amazon Kinesis demo console application that we built in the series on Amazon Kinesis and S3 so have it ready in Visual Studio. It currently has two modules: AmazonKinesisConsumer and AmazonKinesisProducer. Our goal is to calculate the average URL response times over customers and URLs using RedShift. Imagine that you have a website that several customers use. You’d like to see how the average, minimum and maximum response times of the pages on your web sites vary across customers.
If you followed the series on EMR then you’ll know that we have an interface for the aggregation mechanism:
public interface IUrlAggregationService { AggregationResult RunAggregation(string script); }
…with an implementation called AmazonEmrUrlAggregationService. We call upon this aggregation in Main of either AmazonKinesisProducer or AmazonKinesisConsumer, doesn’t matter:
static void Main(string[] args) { IUrlAggregationService aggregationService = new AmazonEmrUrlAggregationService(); AggregationResult aggregationResult = aggregationService.RunAggregation("s3://raw-urls-data/hive-url-aggregation-script.txt"); Console.WriteLine("Main done..."); Console.ReadKey(); }
We’ll implement the interface for RedShift and replace the concrete type in Main.
Preparations
If you still have the records in S3 then you can re-use them. Recall that we entered the following records via the AmazonKinesisProducer project in this post:
http://www.mysite.com/home
Favourite customer
354
GET
http://www.mysite.com/history
Favourite customer
735
GET
http://www.mysite.com/plans
Favourite customer
1532
POST
http://www.mysite.com/home
Favourite customer
476
GET
http://www.mysite.com/history
Favourite customer
1764
GET
http://www.mysite.com/plans
Favourite customer
1467
POST
http://www.mysite.com/home
Favourite customer
745
GET
http://www.mysite.com/history
Favourite customer
814
GET
http://www.mysite.com/plans
Favourite customer
812
POST
…and the following for customer “Nice customer”:
http://www.mysite.com/home
Nice customer
946
GET
http://www.mysite.com/history
Nice customer
1536
GET
http://www.mysite.com/plans
Nice customer
2643
POST
http://www.mysite.com/home
Nice customer
823
GET
http://www.mysite.com/history
Nice customer
1789
GET
http://www.mysite.com/plans
Nice customer
967
POST
http://www.mysite.com/home
Nice customer
1937
GET
http://www.mysite.com/history
Nice customer
3256
GET
http://www.mysite.com/plans
Nice customer
2547
POST
18 records should be sent to our Kinesis channel.
Feel free to generate a new set of data for this exercise. I’ll assume the above data set for all calculations in this post.
Take a note of the folder name in S3 where the records are stored. It should be similar to “s3://raw-urls-data/00-13-04-02-2015”.
Importing data from S3 into RedShift
Before we continue we need to check how to import the raw data from S3 into RedShift. As it turns out it’s quite a trivial task as Amazon have extended Postgresql on RedShift with a special COPY function. Here’s the formula for using the COPY function:
copy [table name] from 'address to S3 folder' credentials 'aws_access_key_id=aws-access-key;aws_secret_access_key=secret-access-key' delimiter 'some delimiter';
Example:
copy url_raw_data from 's3://raw-urls-data/00-13-04-02-2015/' credentials 'aws_access_key_id=abc123;aws_secret_access_key=xyz987' delimiter '\t';
The above code will try to load all files within a folder so you don’t need to bother with the full path to individual files within a folder.
The full aggregation script
We’ll save the full aggregation script on disk first. Create a text file called url_aggregation.txt and save it somewhere on your harddrive with the following content. Make sure you put in the correct values to the placeholders, i.e. the S3 folder name and the AWS access keys:
DROP TABLE IF EXISTS url_response_times; DROP TABLE IF EXISTS FactAggregationUrl; DROP TABLE IF EXISTS DimUrl; DROP TABLE IF EXISTS DimCustomer; CREATE TABLE url_response_times (customer_name varchar, url varchar, method varchar, response_time int, date_utc_unix bigint, formatted_date_utc varchar); copy url_response_times from 's3://[insert path to S3 folder here]' credentials 'aws_access_key_id=[access key id];aws_secret_access_key=[aws secret access key]' delimiter '\t'; CREATE TABLE DimUrl ( dw_id integer NOT NULL, url varchar not null, inserted_utc timestamp default convert_timezone('CET','UTC', getdate()) NOT NULL ); ALTER TABLE DimUrl ADD CONSTRAINT DimUrl_pkey PRIMARY KEY (dw_id); COMMIT; CREATE TABLE DimCustomer ( dw_id integer NOT NULL, name varchar not null, inserted_utc timestamp default convert_timezone('CET','UTC', getdate()) NOT NULL ); ALTER TABLE DimCustomer ADD CONSTRAINT DimCustomer_pkey PRIMARY KEY (dw_id); COMMIT; CREATE TABLE FactAggregationUrl ( url_id int NOT NULL references dimurl(dw_id), customer_id int NOT NULL references dimcustomer(dw_id), url_call_count int not null, min int NOT NULL, max int NOT NULL, avg double precision NOT NULL, median double precision NOT NULL, percentile_95 double precision NOT NULL, percentile_99 double precision NOT NULL, inserted_utc timestamp default convert_timezone('CET','UTC', getdate()) NOT NULL ); insert into DimUrl (dw_id, url) select distinct MOD(STRTOL(CRC32(LOWER(url)),16), 2147483647), url from url_response_times R where not exists (select 1 from DimUrl DU where DU.url = R.url); insert into DimCustomer (dw_id, name) select distinct MOD(STRTOL(CRC32(LOWER(customer_name)),16), 2147483647), customer_name from url_response_times R where not exists (select 1 from DimCustomer DC where DC.name = R.customer_name); insert into FactAggregationUrl ( url_id, customer_id, url_call_count, min, max, avg, median, percentile_95, percentile_99 ) select u.dw_id , c.dw_id , count(u.dw_id) , MIN(response_time) , MAX(response_time) , AVG(response_time) ,median ,percentile_95 ,percentile_99 from url_response_times d, DimUrl u, DimCustomer c, (select distinct median (response_time) over (partition by url, customer_name) as median, url,customer_name from url_response_times group by (response_time) , url, customer_name) med, (select distinct(percentile_95) AS percentile_95, url,customer_name from (select (response_time), percentile_cont (0.95) within group (order by (response_time)) over (partition by url, customer_name) as percentile_95, url, customer_name from url_response_times group by response_time, url, customer_name)) perc_95, (select distinct(percentile_99) AS percentile_99, url,customer_name from (select (response_time), percentile_cont (0.99) within group (order by (response_time)) over (partition by url, customer_name) as percentile_99, url, customer_name from url_response_times group by response_time, url, customer_name)) perc_99 where d.url = u.url and d.customer_name = c.name and d.url = med.url and d.customer_name = med.customer_name and d.url = perc_95.url and d.customer_name = perc_95.customer_name and d.url = perc_99.url and d.customer_name = perc_99.customer_name group by u.dw_id , c.dw_id , med.median , perc_95.percentile_95 , perc_99.percentile_99;
All of that should look familiar from the previous posts: we create the necessary tables, load the data records from S3, fill in the dimension tables with unique values and finally calculate the aggregates by URL and Customer.
It’s a good idea to test the script in WorkBenchJ beforehand. Make sure that it works there first from start to finish.
The RedShift aggregator implementation
Let’s implement the IUrlAggregationService interface based on what we have learnt so far. Recall that we had this interface in the Aggregations project within the Kinesis demo project we started building here. Insert a new file into the Aggregations project called AmazonRedShiftUrlAggregationService with the following body:
public class AmazonRedShiftUrlAggregationService : IUrlAggregationService { private readonly string _clusterName; public AmazonRedShiftUrlAggregationService(string clusterName) { _clusterName = clusterName; } public AggregationResult RunAggregation(string script) { AggregationResult aggregationResult = new AggregationResult(); using (IAmazonRedshift redshiftClient = GetRedShiftClient()) { try { DescribeClustersRequest describeClustersRequest = new DescribeClustersRequest() { ClusterIdentifier = _clusterName }; DescribeClustersResponse describeClustersResponse = redshiftClient.DescribeClusters(describeClustersRequest); Cluster firstMatch = describeClustersResponse.Clusters[0]; String mainDbName = firstMatch.DBName; String endpointAddress = firstMatch.Endpoint.Address; int endpointPort = firstMatch.Endpoint.Port; string masterUsername = firstMatch.MasterUsername; string password = "your resshift cluster password"; string odbcConnectionString = string.Concat("Driver={PostgreSQL Unicode}; Server=", endpointAddress , "; Database=", mainDbName, "; UID=", masterUsername, "; PWD=", password , "; Port=", endpointPort); using (OdbcConnection conn = new OdbcConnection(odbcConnectionString)) { try { conn.Open(); aggregationResult.AggregationProcessStartUtc = DateTime.UtcNow; OdbcCommand odbcCommand = new OdbcCommand(script, conn); odbcCommand.ExecuteNonQuery(); aggregationResult.AggregationProcessEndUtc = DateTime.UtcNow; aggregationResult.Success = true; } catch (Exception ex) { aggregationResult.ExceptionMessage = ex.Message; } } } catch (AmazonRedshiftException e) { aggregationResult.ExceptionMessage = e.Message; } } return aggregationResult; } private IAmazonRedshift GetRedShiftClient() { return new AmazonRedshiftClient(RegionEndpoint.EUWest1); } }
All of that code should be familiar from the previous posts where we discussed how to connect to RedShift using ODBC. We’ll read the aggregation script from file and let the OdbcCommand object execute all statements in one batch.
You can call the above method from Main as follows:
string script = File.ReadAllText(@"c:\path-to-folder\url_aggregation.txt"); string redshiftClusterName = "name-of-redshift-cluster"; IUrlAggregationService aggregationService = new AmazonRedShiftUrlAggregationService(redshiftClusterName); AggregationResult aggregationResult = aggregationService.RunAggregation(script); Console.WriteLine("Aggregation success: {0}, start: {1}, finish: {2}, any exception message: {3}" , aggregationResult.Success, aggregationResult.AggregationProcessStartUtc, aggregationResult.AggregationProcessEndUtc , aggregationResult.ExceptionMessage); Console.WriteLine("Main done..."); Console.ReadKey();
Run the application. If all goes well then you should see output similar to the following:
You can check the contents of FactAggregationUrl and make sure that it really contains the aggregations:
Indeed, it does.
This post finished our discussion of Amazon RedShift.
The next post will summarise the things we have discussed in the Amazon Big Data series.
View all posts related to Amazon Web Services and Big Data here.