Using Amazon Elastic MapReduce with the AWS .NET API Part 8: connection to our Big Data demo

Introduction

In the previous post we saw how to start an Amazon EMR cluster and have it execute a Hive script which performs a basic aggregation step.

This post will take up the Big Data thread where we left off at the end of the previous 2 series on the blob storage component Amazon S3 and the NoSql data store DynamoDb. Therefore the pre-requisite of following the code examples in this post is familiarity with what we discussed in those topics.

Reminder

Where were we in the first place? The last post of the Amazon S3 series stopped where we stored our raw data in S3 in folders like this:

Test data points as seen in Amazon S3

The individual files in each bucket can look like the following:

Specific data points in storage files in Amazon S3

Typical plain text records will look like below:

cnn http://www.cnn.com GET 135 1418419114682 2014-12-12 21:18:34
cnn http://www.cnn.com/africa GET 1764 1418419125964 2014-12-12 21:18:45

The next task is to apply what we’ve learnt in this EMR series and run some type of aggregation.

Aggregating over customers and URLs

We’ll extend the Amazon Kinesis demo console application that we built in the series on Amazon Kinesis and S3 so have it ready in Visual Studio. It currently has two modules: AmazonKinesisConsumer and AmazonKinesisProducer. Our goal is to calculate the average URL response times over customers and URLs. Imagine that you have a website that several customers use. You’d like to see how the average, minimum and maximum response times of the pages on your web sites vary across customers.

Preparations

Cleanup of S3 bucket

Recall that the Kinesis consumer application saved all raw data observations in a top bucket called raw-urls-data in S3. Clear the contents of that folder now so that our new tests don’t get mixed up with any old data.

Generating raw data

Let’s generate some new data in S3 so that they make sense for our planned aggregation purposes: count the average, min and max response time per customer and URL where we imagine that we measure the response times of our own web site which is used by different customers.

Start AmazonKinesisProducer and insert some data records over at least 2 customers where each customer has at least two data records for a single URL so that we can count meaningful averages. Here are the values I’m going to enter.

For customer “Favourite customer”:

http://www.mysite.com/home
Favourite customer
354
GET

http://www.mysite.com/history
Favourite customer
735
GET

http://www.mysite.com/plans
Favourite customer
1532
POST

http://www.mysite.com/home
Favourite customer
476
GET

http://www.mysite.com/history
Favourite customer
1764
GET

http://www.mysite.com/plans
Favourite customer
1467
POST

http://www.mysite.com/home
Favourite customer
745
GET

http://www.mysite.com/history
Favourite customer
814
GET

http://www.mysite.com/plans
Favourite customer
812
POST

…and the following for customer “Nice customer”:

http://www.mysite.com/home
Nice customer
946
GET

http://www.mysite.com/history
Nice customer
1536
GET

http://www.mysite.com/plans
Nice customer
2643
POST

http://www.mysite.com/home
Nice customer
823
GET

http://www.mysite.com/history
Nice customer
1789
GET

http://www.mysite.com/plans
Nice customer
967
POST

http://www.mysite.com/home
Nice customer
1937
GET

http://www.mysite.com/history
Nice customer
3256
GET

http://www.mysite.com/plans
Nice customer
2547
POST

18 records should be sent to our Kinesis channel.

Start AmazonKinesisConsumer now and let the records be processed. If all goes well then all 18 records should be processed:

Testing EMR from Kinesis demo output shows records from Kineses

Let’s just check in S3 if we actually have the raw data available:

EMR test data to be aggregated visible in S3

Take a note of the folder name. In the above example it will be “s3://raw-urls-data/00-13-04-02-2015”.

Still in S3 we need to upload a Hive script that will be executed by our application. Prepare a text file called hive-url-aggregation-script.txt with the following content:

create external table if not exists raw_urls (customer string, url string, web_method string, response_time int, date_unix_utc bigint, date_formatted_utc string) row format delimited fields terminated by '\t' location 's3://raw-urls-data/00-13-04-02-2015';
create external table if not exists aggregations (id string, customer string, url string, min_response_time bigint, max_response_time bigint, avg_response_time double) stored by 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' tblproperties("dynamodb.table.name" = "aggregation-tests", "dynamodb.region" = "eu-west-1", "dynamodb.column.mapping" = "id:id,customer:customer,url:url_name,min_response_time:min_response_time,max_response_time:max_response_time,avg_response_time:average_response_time");
create table if not exists temporary_local_aggregation (id string, customer string, url string, min_response_time bigint, max_response_time bigint, avg_response_time double);
insert overwrite table temporary_local_aggregation SELECT reflect("java.util.UUID", "randomUUID"), customer, url, min(response_time),max(response_time), avg(response_time) FROM raw_urls GROUP BY customer, url;
insert into table aggregations select * from temporary_local_aggregation;

Most of this should look familiar from what we’ve seen so far in this series. Don’t forget to update the S3 address in the first statement to what you have in S3. There are a couple of new things:

  • Hive has the ‘min’ and ‘max’ aggregation functions just like “normal” SQL
  • DynamoDb does not support the Hive int data type hence the bigint for min and max response times in the second statement where we create the reference to the aggregation-tests table

Upload the file to the raw-urls-data folder in S3 so its full address will be s3://raw-urls-data/hive-url-aggregation-script.txt. We’ll use this reference in code later on.

DynamoDb cleanup

We’ll reuse our DynamoDb table called “aggregation-tests” that we set up in this series on EMR. Clean all existing records from it.

The aggregation

Add a new C# class library called Aggregations to the solution. Remove Class1 and add the usual AWS NuGet package to it:

AWS SDK NuGet package

We’ll hide the aggregation mechanism behind a basic interface. The reason is that we’ll look at an aggregation alternative in the next series called Amazon RedShift and we’ll want to change the actual implementation relatively easily.

Insert the following interface:

public interface IUrlAggregationService
{
	AggregationResult RunAggregation(string script);
}

…where AggregationResult looks as follows:

public class AggregationResult
{
	public bool Success { get; set; }
	public string ExceptionMessage { get; set; }
	public DateTime AggregationProcessStartUtc { get; set; }
	public DateTime AggregationProcessEndUtc { get; set; }
}

Insert a class called AmazonEmrUrlAggregationService which will implement IUrlAggregationService. Here’s full implementation of the class including the GetEmrClient and StartNewActiveWaitingJobFlow methods that all should look familiar from the previous posts. These methods are called upon from the implemented RunAggregation method:

public class AmazonEmrUrlAggregationService : IUrlAggregationService
{
	public AggregationResult RunAggregation(string script)
	{
		AggregationResult aggregationResult = RunHiveScriptStep(script);
		return aggregationResult;
	}

	private AggregationResult RunHiveScriptStep(string hiveScriptLocation)
	{
		AggregationResult aggregationResult = new AggregationResult();
		using (IAmazonElasticMapReduce emrClient = GetEmrClient())
		{
			try
			{
				String activeWaitingJobFlowId = StartNewActiveWaitingJobFlow();
				if (!string.IsNullOrEmpty(activeWaitingJobFlowId))
				{
					StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);
					StepConfig runHiveScript = new StepConfig()
					{
						Name = "Run Hive script",
						HadoopJarStep = stepFactory.NewRunHiveScriptStep(hiveScriptLocation),
						ActionOnFailure = "TERMINATE_JOB_FLOW"
					};
					AddJobFlowStepsRequest addHiveRequest = new AddJobFlowStepsRequest(activeWaitingJobFlowId, new List<StepConfig>() { runHiveScript });
					AddJobFlowStepsResult addHiveResponse = emrClient.AddJobFlowSteps(addHiveRequest);
					List<string> stepIds = addHiveResponse.StepIds;
					String hiveStepId = stepIds[0];

					DescribeStepRequest describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
					DescribeStepResult describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
					Step hiveStep = describeHiveStepResult.Step;
					StepStatus hiveStepStatus = hiveStep.Status;
					string hiveStepState = hiveStepStatus.State.Value.ToLower();
					bool failedState = false;
					StepTimeline finalTimeline = null;
					while (hiveStepState != "completed")
					{
						describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
						describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
						hiveStep = describeHiveStepResult.Step;
						hiveStepStatus = hiveStep.Status;
						hiveStepState = hiveStepStatus.State.Value.ToLower();
						finalTimeline = hiveStepStatus.Timeline;
						Console.WriteLine(string.Format("Current state of Hive script execution: {0}", hiveStepState));
						switch (hiveStepState)
						{
							case "pending":
							case "running":
								Thread.Sleep(10000);
								break;
							case "cancelled":
							case "failed":
							case "interrupted":
								failedState = true;
								break;
						}
						if (failedState)
						{
							break;
						}
					}
					if (finalTimeline != null)
					{
						aggregationResult.AggregationProcessStartUtc = finalTimeline.StartDateTime;
						aggregationResult.AggregationProcessEndUtc = finalTimeline.EndDateTime;
					}
					TerminateJobFlowsRequest terminateRequest = new TerminateJobFlowsRequest(new List<string> { activeWaitingJobFlowId });
					TerminateJobFlowsResponse terminateResponse = emrClient.TerminateJobFlows(terminateRequest);

					aggregationResult.Success = !failedState;
					if (failedState)
					{							
						aggregationResult.ExceptionMessage = string.Concat("Final aggregation process status: "
							, hiveStepState, ". Possible reason: ", hiveStepStatus.StateChangeReason == null ? "N/A" : hiveStepStatus.StateChangeReason.Message);								
					}
				}
				else
				{
					throw new Exception("No valid job flow could be created.");
				}
			}
			catch (AmazonElasticMapReduceException emrException)
			{
				StringBuilder exceptionBuilder = new StringBuilder();
				exceptionBuilder.Append("Hive script execution step has failed. Amazon error code: ")
					.Append(string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode)
					.Append(". Exception message: ").Append(emrException.Message);
				aggregationResult.ExceptionMessage = exceptionBuilder.ToString();
			}
			catch (Exception otherException)
			{
				aggregationResult.ExceptionMessage = otherException.Message;
			}
		}
		return aggregationResult;
	}

	private string StartNewActiveWaitingJobFlow()
	{
		using (IAmazonElasticMapReduce emrClient = GetEmrClient())
		{
			try
			{
				StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);

				StepConfig installHive = new StepConfig()
				{
					Name = "Install Hive step",
					ActionOnFailure = "TERMINATE_JOB_FLOW",
					HadoopJarStep = stepFactory.NewInstallHiveStep(StepFactory.HiveVersion.Hive_Latest)
				};

				StepConfig installPig = new StepConfig()
				{
					Name = "Install Pig step",
					ActionOnFailure = "TERMINATE_JOB_FLOW",
					HadoopJarStep = stepFactory.NewInstallPigStep()
				};

				JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig()
				{
					Ec2KeyName = "apicakey",
					HadoopVersion = "2.4.0",
					InstanceCount = 3,
					KeepJobFlowAliveWhenNoSteps = true,
					MasterInstanceType = "m3.xlarge",
					SlaveInstanceType = "m3.xlarge"
				};

				RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest()
				{
					Name = "EMR cluster",
					Steps = { installHive, installPig },
					Instances = jobFlowInstancesConfig
				};

				RunJobFlowResponse runJobFlowResponse = emrClient.RunJobFlow(runJobFlowRequest);
				String jobFlowId = runJobFlowResponse.JobFlowId;
				Console.WriteLine(string.Format("Job flow id: {0}", jobFlowId));
				bool oneStepNotFinished = true;
				while (oneStepNotFinished)
				{
					DescribeJobFlowsRequest describeJobFlowsRequest = new DescribeJobFlowsRequest(new List<string>() { jobFlowId });
					DescribeJobFlowsResponse describeJobFlowsResponse = emrClient.DescribeJobFlows(describeJobFlowsRequest);
					JobFlowDetail jobFlowDetail = describeJobFlowsResponse.JobFlows.First();
					List<StepDetail> stepDetailsOfJob = jobFlowDetail.Steps;
					List<string> stepStatuses = new List<string>();
					foreach (StepDetail stepDetail in stepDetailsOfJob)
					{
						StepExecutionStatusDetail status = stepDetail.ExecutionStatusDetail;
						String stepConfigName = stepDetail.StepConfig.Name;
						String statusValue = status.State.Value;
						stepStatuses.Add(statusValue.ToLower());
						Console.WriteLine(string.Format("Step config name: {0}, step status: {1}", stepConfigName, statusValue));
					}
					oneStepNotFinished = stepStatuses.Contains("pending") || stepStatuses.Contains("running");
					Thread.Sleep(10000);
				}
				return jobFlowId;
			}
			catch (AmazonElasticMapReduceException emrException)
			{
				Console.WriteLine("Cluster list creation has failed.");
				Console.WriteLine("Amazon error code: {0}",
					string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
				Console.WriteLine("Exception message: {0}", emrException.Message);
			}
		}

		return string.Empty;
	}

	private IAmazonElasticMapReduce GetEmrClient()
	{
		return Amazon.AWSClientFactory.CreateAmazonElasticMapReduceClient(RegionEndpoint.EUWest1);
	}
}

If you’re not certain what this code means then revisit the previous posts on EMR, there’s nothing new here: we start up a new cluster, install Hive and Pig and then add a new Hive execution step to the waiting cluster. The cluster is then terminated.

Add a reference to the Aggregations project from either AmazonKinesisConsumer or AmazonKinesisProducer, doesn’t really matter. We’ll call the aggregation service from one of the Main methods:

static void Main(string[] args)
{
	IUrlAggregationService aggregationService = new AmazonEmrUrlAggregationService();
	AggregationResult aggregationResult = aggregationService.RunAggregation("s3://raw-urls-data/hive-url-aggregation-script.txt");

	Console.WriteLine("Main done...");
	Console.ReadKey();
}

Set the selected console project as the startup project and start the application.

It is possible that you’ll get an exception similar to the following:

Could not load file or assembly ‘AWSSDK, Version=2.3.18.0, Culture=neutral, PublicKeyToken=9f476d3089b52be3’ or one of its dependencies. The located assembly’s manifest definition does not match the assembly reference. (Exception from HRESULT: 0x80131040)

The AWS .NET library is updated quite often and it can happen that different projects within a solution will have different .NET AWS SDK versions. You’ll probably know that .NET doesn’t like referencing different versions of an assembly within a solution so we need to bring the SDK versions to the same level. The solution is to update the SDK version using NuGet in every connected project that uses the SDK. If you referenced the Aggregations project from AmazonKinesisConsumer then update the SDK in that project, otherwise update it in the AmazonKinesisProducer project:

Update AWS SDK NuGet package

You’ll first see the Hive and Pig steps run complete like we saw in previous posts. The Hive script execution will start next:

Hive script execution after initial job flow completion

After a couple of minutes the Hive script execution will finish:

Hive script execution completed after initial job flow completion

Let’s check the results in DynamoDb:

DynamoDb aggregation table populated after full NET code execution

Excellent, we have laid down a starting point for an aggregation mechanism using the .NET AWS library.

This post concludes the series on Elastic MapReduce with .NET. The next series will take up yet another data storage and aggregation mechanism within Amazon Cloud: RedShift.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: