Using Amazon RedShift with the AWS .NET API Part 10: RedShift in Big Data

Introduction

In the previous post we discussed how to calculate the more complex parts of the aggregation script: the median and nth percentile if the URL response time.

This post will take up the Big Data thread where we left off at the end of the series on Amazon S3. We’ll also refer to what we built [at the end of the series on Elastic MapReduce]. That post took up how to run an aggregation job via the AWS .NET SDK on an available EMR cluster. Therefore the pre-requisite of following the code examples in this post is familiarity with what we discussed in those topics.

In this post our goal is to show an alternative to EMR. We’ll also see how to import the raw data source from S3 into RedShift.

Reminder

Where were we in the first place? The last post of the Amazon S3 series stopped where we stored our raw data in S3 in folders like this:

Test data points as seen in Amazon S3

The individual files in each bucket can look like the following:

Specific data points in storage files in Amazon S3

Typical plain text records will look like below:

cnn http://www.cnn.com GET 135 1418419114682 2014-12-12 21:18:34
cnn http://www.cnn.com/africa GET 1764 1418419125964 2014-12-12 21:18:45

The next task is to apply what we’ve learnt in this RedShift series and run some type of aggregation. We’ve seen all necessary ingredients in the posts so far, we’ll reuse all that.

Aggregating over customers and URLs

We’ll extend the Amazon Kinesis demo console application that we built in the series on Amazon Kinesis and S3 so have it ready in Visual Studio. It currently has two modules: AmazonKinesisConsumer and AmazonKinesisProducer. Our goal is to calculate the average URL response times over customers and URLs using RedShift. Imagine that you have a website that several customers use. You’d like to see how the average, minimum and maximum response times of the pages on your web sites vary across customers.

If you followed the series on EMR then you’ll know that we have an interface for the aggregation mechanism:

public interface IUrlAggregationService
{
	AggregationResult RunAggregation(string script);
}

…with an implementation called AmazonEmrUrlAggregationService. We call upon this aggregation in Main of either AmazonKinesisProducer or AmazonKinesisConsumer, doesn’t matter:

static void Main(string[] args)
{
	IUrlAggregationService aggregationService = new AmazonEmrUrlAggregationService();
	AggregationResult aggregationResult = aggregationService.RunAggregation("s3://raw-urls-data/hive-url-aggregation-script.txt");

	Console.WriteLine("Main done...");
	Console.ReadKey();
}

We’ll implement the interface for RedShift and replace the concrete type in Main.

Preparations

If you still have the records in S3 then you can re-use them. Recall that we entered the following records via the AmazonKinesisProducer project in this post:

http://www.mysite.com/home
Favourite customer
354
GET

http://www.mysite.com/history
Favourite customer
735
GET

http://www.mysite.com/plans
Favourite customer
1532
POST

http://www.mysite.com/home
Favourite customer
476
GET

http://www.mysite.com/history
Favourite customer
1764
GET

http://www.mysite.com/plans
Favourite customer
1467
POST

http://www.mysite.com/home
Favourite customer
745
GET

http://www.mysite.com/history
Favourite customer
814
GET

http://www.mysite.com/plans
Favourite customer
812
POST

…and the following for customer “Nice customer”:

http://www.mysite.com/home
Nice customer
946
GET

http://www.mysite.com/history
Nice customer
1536
GET

http://www.mysite.com/plans
Nice customer
2643
POST

http://www.mysite.com/home
Nice customer
823
GET

http://www.mysite.com/history
Nice customer
1789
GET

http://www.mysite.com/plans
Nice customer
967
POST

http://www.mysite.com/home
Nice customer
1937
GET

http://www.mysite.com/history
Nice customer
3256
GET

http://www.mysite.com/plans
Nice customer
2547
POST

18 records should be sent to our Kinesis channel.

Feel free to generate a new set of data for this exercise. I’ll assume the above data set for all calculations in this post.

Take a note of the folder name in S3 where the records are stored. It should be similar to “s3://raw-urls-data/00-13-04-02-2015”.

Importing data from S3 into RedShift

Before we continue we need to check how to import the raw data from S3 into RedShift. As it turns out it’s quite a trivial task as Amazon have extended Postgresql on RedShift with a special COPY function. Here’s the formula for using the COPY function:

copy [table name] from 'address to S3 folder' credentials 'aws_access_key_id=aws-access-key;aws_secret_access_key=secret-access-key' delimiter 'some delimiter';

Example:

copy url_raw_data from 's3://raw-urls-data/00-13-04-02-2015/' credentials 'aws_access_key_id=abc123;aws_secret_access_key=xyz987' delimiter '\t';

The above code will try to load all files within a folder so you don’t need to bother with the full path to individual files within a folder.

The full aggregation script

We’ll save the full aggregation script on disk first. Create a text file called url_aggregation.txt and save it somewhere on your harddrive with the following content. Make sure you put in the correct values to the placeholders, i.e. the S3 folder name and the AWS access keys:

DROP TABLE IF EXISTS url_response_times;
DROP TABLE IF EXISTS FactAggregationUrl;
DROP TABLE IF EXISTS DimUrl;
DROP TABLE IF EXISTS DimCustomer;

CREATE TABLE url_response_times (customer_name varchar, url varchar, method varchar, response_time int, date_utc_unix bigint, formatted_date_utc varchar);
copy url_response_times from 's3://[insert path to S3 folder here]' credentials 'aws_access_key_id=[access key id];aws_secret_access_key=[aws secret access key]' delimiter '\t';

CREATE TABLE DimUrl
(
		dw_id            integer         NOT NULL,
		url varchar not null,
		inserted_utc     timestamp       default convert_timezone('CET','UTC', getdate()) NOT NULL
);

ALTER TABLE DimUrl
   ADD CONSTRAINT DimUrl_pkey
   PRIMARY KEY (dw_id);

COMMIT;

CREATE TABLE DimCustomer
(
		dw_id            integer         NOT NULL,
		name varchar not null,
		inserted_utc     timestamp       default convert_timezone('CET','UTC', getdate()) NOT NULL
);

ALTER TABLE DimCustomer
   ADD CONSTRAINT DimCustomer_pkey
   PRIMARY KEY (dw_id);

COMMIT;

CREATE TABLE FactAggregationUrl
(
	url_id int NOT NULL references dimurl(dw_id),
	customer_id int NOT NULL references dimcustomer(dw_id),	
	url_call_count int not null,
	min int NOT NULL,
	max int NOT NULL,
	avg double precision NOT NULL,
	median double precision NOT NULL,
	percentile_95 double precision NOT NULL,
	percentile_99 double precision NOT NULL,
	inserted_utc     timestamp       default convert_timezone('CET','UTC', getdate()) NOT NULL
);

insert into DimUrl (dw_id, url)
select distinct
 MOD(STRTOL(CRC32(LOWER(url)),16), 2147483647),
 url
 from url_response_times R
 where 
    not exists (select 1 from DimUrl DU where
    DU.url = R.url);

insert into DimCustomer (dw_id, name)
select distinct
 MOD(STRTOL(CRC32(LOWER(customer_name)),16), 2147483647),
 customer_name
 from url_response_times R
 where 
    not exists (select 1 from DimCustomer DC where
    DC.name = R.customer_name);
    
insert into FactAggregationUrl
(
	url_id,
	customer_id,	
	url_call_count,
	min,
	max,
	avg,
	median,
	percentile_95,
	percentile_99
)
select
	u.dw_id	
	, c.dw_id
	, count(u.dw_id)
	, MIN(response_time)
	, MAX(response_time)
	, AVG(response_time)
	,median
	,percentile_95
	,percentile_99
from
	url_response_times d,	
  DimUrl u,  
  DimCustomer c,
  (select distinct median
			(response_time)
		over (partition by url, customer_name) as median, url,customer_name from url_response_times
		group by 
			(response_time)
			, url, customer_name) med,
	 (select distinct(percentile_95) AS percentile_95, url,customer_name from (select (response_time), 
	 		percentile_cont (0.95) within group (order by (response_time)) 
	 		over (partition by url, customer_name) as percentile_95, url, customer_name from url_response_times group by response_time, url, customer_name)) perc_95,
	 (select distinct(percentile_99) AS percentile_99, url,customer_name from (select (response_time), 
	 		percentile_cont (0.99) within group (order by (response_time)) 
	 		over (partition by url, customer_name) as percentile_99, url, customer_name from url_response_times group by response_time, url, customer_name)) perc_99	
where	
  d.url = u.url 
  	and d.customer_name = c.name			
  	and d.url = med.url
  	and d.customer_name = med.customer_name
  	and d.url = perc_95.url
  	and d.customer_name = perc_95.customer_name
  	and d.url = perc_99.url
  	and d.customer_name = perc_99.customer_name
group by 
	u.dw_id	
	, c.dw_id
	, med.median
	, perc_95.percentile_95
	, perc_99.percentile_99;

All of that should look familiar from the previous posts: we create the necessary tables, load the data records from S3, fill in the dimension tables with unique values and finally calculate the aggregates by URL and Customer.

It’s a good idea to test the script in WorkBenchJ beforehand. Make sure that it works there first from start to finish.

The RedShift aggregator implementation

Let’s implement the IUrlAggregationService interface based on what we have learnt so far. Recall that we had this interface in the Aggregations project within the Kinesis demo project we started building here. Insert a new file into the Aggregations project called AmazonRedShiftUrlAggregationService with the following body:

public class AmazonRedShiftUrlAggregationService : IUrlAggregationService
{
	private readonly string _clusterName;

	public AmazonRedShiftUrlAggregationService(string clusterName)
	{
		_clusterName = clusterName;
	}

	public AggregationResult RunAggregation(string script)
	{
		AggregationResult aggregationResult = new AggregationResult();
		using (IAmazonRedshift redshiftClient = GetRedShiftClient())
		{
			try
			{
				DescribeClustersRequest describeClustersRequest = new DescribeClustersRequest() { ClusterIdentifier = _clusterName };
				DescribeClustersResponse describeClustersResponse = redshiftClient.DescribeClusters(describeClustersRequest);
				Cluster firstMatch = describeClustersResponse.Clusters[0];
				String mainDbName = firstMatch.DBName;
				String endpointAddress = firstMatch.Endpoint.Address;
				int endpointPort = firstMatch.Endpoint.Port;
				string masterUsername = firstMatch.MasterUsername;
				string password = "your resshift cluster password";

				string odbcConnectionString = string.Concat("Driver={PostgreSQL Unicode}; Server=", endpointAddress
					, "; Database=", mainDbName, "; UID=", masterUsername, "; PWD=", password
					, "; Port=", endpointPort);
				using (OdbcConnection conn = new OdbcConnection(odbcConnectionString))
				{
					try
					{
						conn.Open();
						aggregationResult.AggregationProcessStartUtc = DateTime.UtcNow;
						OdbcCommand odbcCommand = new OdbcCommand(script, conn);
						odbcCommand.ExecuteNonQuery();							
						aggregationResult.AggregationProcessEndUtc = DateTime.UtcNow;
						aggregationResult.Success = true;
					}
					catch (Exception ex)
					{
						aggregationResult.ExceptionMessage = ex.Message;
					}
				}				
			}
			catch (AmazonRedshiftException e)
			{
				aggregationResult.ExceptionMessage = e.Message;
			}
		}
		return aggregationResult;
	}

	private IAmazonRedshift GetRedShiftClient()
	{
		return new AmazonRedshiftClient(RegionEndpoint.EUWest1);
	}
}

All of that code should be familiar from the previous posts where we discussed how to connect to RedShift using ODBC. We’ll read the aggregation script from file and let the OdbcCommand object execute all statements in one batch.

You can call the above method from Main as follows:

string script = File.ReadAllText(@"c:\path-to-folder\url_aggregation.txt");
string redshiftClusterName = "name-of-redshift-cluster";
IUrlAggregationService aggregationService = new AmazonRedShiftUrlAggregationService(redshiftClusterName);
AggregationResult aggregationResult = aggregationService.RunAggregation(script);
Console.WriteLine("Aggregation success: {0}, start: {1}, finish: {2}, any exception message: {3}"
	, aggregationResult.Success, aggregationResult.AggregationProcessStartUtc, aggregationResult.AggregationProcessEndUtc
	, aggregationResult.ExceptionMessage);

Console.WriteLine("Main done...");
Console.ReadKey();

Run the application. If all goes well then you should see output similar to the following:

Overall Big Data aggregation job executed on RedShift via ODBC

You can check the contents of FactAggregationUrl and make sure that it really contains the aggregations:

Final RedShift calculation of Big Data visible in fact table

Indeed, it does.

This post finished our discussion of Amazon RedShift.

The next post will summarise the things we have discussed in the Amazon Big Data series.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: