Using Amazon Elastic MapReduce with the AWS .NET API Part 7: indirect Hive with .NET

Introduction

In the previous post we looked at the basics of how EMR can interact with Amazon S3 and DynamoDb using some additional commands and properties of Hive. In part 5 of this series we saw how to start a new cluster and assign a couple of steps to it using .NET. In this post we’ll marry the two: we’ll start up a new cluster and assign the full aggregation job flow to it in code.

It’s not possible to send a set of Hive commands in plain text to the cluster. The Hive script must be stored somewhere where EMR can find it. The .NET code can only point out the location. All this means that we’ll need to prepare the Hive script beforehand. An ideal place to store all types of files is of course Amazon S3 and we’ll follow that line here as well. This is the reason why I called this post “indirect Hive”.

Preparation

First delete all records from the aggregation-tests DynamoDb table from the previous post so that we won’t confuse the old results with the new ones.

Next, create a text file called hive-aggregation-script.txt and insert the following statements:

create external table if not exists raw_urls (url string, response_time int) row format delimited fields terminated by '|' location 's3://raw-urls-data/';
create external table if not exists aggregations (id string, url string, avg_response_time double) stored by 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' tblproperties("dynamodb.table.name" = "aggregation-tests", "dynamodb.region" = "eu-west-1", "dynamodb.column.mapping" = "id:id,url:url_name,avg_response_time:average_response_time");
create table if not exists temporary_local_aggregation (id string, url string, avg_response_time double);
insert overwrite table temporary_local_aggregation SELECT reflect("java.util.UUID", "randomUUID"), url, avg(response_time) FROM raw_urls GROUP BY url;
insert into table aggregations select * from temporary_local_aggregation;
drop table aggregations;
drop table raw_urls;

All of these statements are copied directly from the previous post. It’s just a step-by-step instruction to create the external tables, run the aggregations and store them in DynamoDb.

Upload hive-aggregation-script.txt to S3 in a bucket called “hive-aggregation-scripts”.

.NET

Open the EMR tester console application we started building in part 5 referenced above. We have a method called StartCluster in EmrDemoService.cs. We’ll first insert a similar method. It starts a cluster and returns the job flow id. Insert the following method in EmrDemoService:

public string StartNewActiveWaitingJobFlow()
{
	using (IAmazonElasticMapReduce emrClient = GetEmrClient())
	{
		try
		{
			StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);

			StepConfig installHive = new StepConfig()
			{
				Name = "Install Hive step",
				ActionOnFailure = "TERMINATE_JOB_FLOW",
				HadoopJarStep = stepFactory.NewInstallHiveStep(StepFactory.HiveVersion.Hive_Latest)
			};

			StepConfig installPig = new StepConfig()
			{
				Name = "Install Pig step",
				ActionOnFailure = "TERMINATE_JOB_FLOW",
				HadoopJarStep = stepFactory.NewInstallPigStep()
			};

			JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig()
			{
				Ec2KeyName = "insert-your-ec2-key-name",
				HadoopVersion = "2.4.0",
				InstanceCount = 3,
				KeepJobFlowAliveWhenNoSteps = true,
				MasterInstanceType = "m3.xlarge",
				SlaveInstanceType = "m3.xlarge"
			};

			RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest()
			{
				Name = "EMR cluster",
				Steps = { installHive, installPig },
				Instances = jobFlowInstancesConfig
			};

			RunJobFlowResponse runJobFlowResponse = emrClient.RunJobFlow(runJobFlowRequest);
			String jobFlowId = runJobFlowResponse.JobFlowId;
			Console.WriteLine(string.Format("Job flow id: {0}", jobFlowId));
			bool oneStepNotFinished = true;
			while (oneStepNotFinished)
			{
				DescribeJobFlowsRequest describeJobFlowsRequest = new DescribeJobFlowsRequest(new List<string>() { jobFlowId });
				DescribeJobFlowsResponse describeJobFlowsResponse = emrClient.DescribeJobFlows(describeJobFlowsRequest);
				JobFlowDetail jobFlowDetail = describeJobFlowsResponse.JobFlows.First();
				List<StepDetail> stepDetailsOfJob = jobFlowDetail.Steps;
				List<string> stepStatuses = new List<string>();
				foreach (StepDetail stepDetail in stepDetailsOfJob)
				{
					StepExecutionStatusDetail status = stepDetail.ExecutionStatusDetail;
					String stepConfigName = stepDetail.StepConfig.Name;
					String statusValue = status.State.Value;
					stepStatuses.Add(statusValue.ToLower());
					Console.WriteLine(string.Format("Step config name: {0}, step status: {1}", stepConfigName, statusValue));
				}
				oneStepNotFinished = stepStatuses.Contains("pending") || stepStatuses.Contains("running");
				Thread.Sleep(10000);
			}
			return jobFlowId;
		}
		catch (AmazonElasticMapReduceException emrException)
		{
			Console.WriteLine("Cluster list creation has failed.");
			Console.WriteLine("Amazon error code: {0}",
				string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
			Console.WriteLine("Exception message: {0}", emrException.Message);
		}
	}

	return string.Empty;
}

You should understand all of that from part 5 referred to above. It is just a slightly reduced form of the StartCluster method: we have no logging, no tags, no termination protection, we don’t need those right now.

So the method returns a job flow ID. We can use that ID to add new steps to be executed in the cluster. Adding extra steps is also performed by the StepFactory and StepConfig objects. StepFactory has a method called NewRunHiveScriptStep which accepts the S3 address of the Hive script you’d like to have executed.

The below method adds a step to the job flow, monitors its progress and periodically prints out the step status, such as “pending” or “running”. When the step has completed the method prints the final status to the console window. The cluster is finally terminated:

public void RunHiveScriptStep()
{
	using (IAmazonElasticMapReduce emrClient = GetEmrClient())
	{
		try
		{
			String activeWaitingJobFlowId = StartNewActiveWaitingJobFlow();
			if (!string.IsNullOrEmpty(activeWaitingJobFlowId))
			{
				StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);
				String scriptLocation = "s3://hive-aggregation-scripts/hive-aggregation-script.txt";
				StepConfig runHiveScript = new StepConfig()
				{
					Name = "Run Hive script",
					HadoopJarStep = stepFactory.NewRunHiveScriptStep(scriptLocation),
					ActionOnFailure = "TERMINATE_JOB_FLOW"
				};
				AddJobFlowStepsRequest addHiveRequest = new AddJobFlowStepsRequest(activeWaitingJobFlowId, new List<StepConfig>(){runHiveScript});
				AddJobFlowStepsResult addHiveResponse = emrClient.AddJobFlowSteps(addHiveRequest);
				List<string> stepIds = addHiveResponse.StepIds;
				String hiveStepId = stepIds[0];

				DescribeStepRequest describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
				DescribeStepResult describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
				Step hiveStep = describeHiveStepResult.Step;
				StepStatus hiveStepStatus = hiveStep.Status;
				string hiveStepState = hiveStepStatus.State.Value.ToLower();
				bool failedState = false;
				StepTimeline finalTimeline = null;
				while (hiveStepState != "completed")
				{
					describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
					describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
					hiveStep = describeHiveStepResult.Step;
					hiveStepStatus = hiveStep.Status;
					hiveStepState = hiveStepStatus.State.Value.ToLower();
					finalTimeline = hiveStepStatus.Timeline;
					Console.WriteLine(string.Format("Current state of Hive script execution: {0}", hiveStepState));
					switch (hiveStepState)
					{
						case "pending":
						case "running":
							Thread.Sleep(10000);
							break;
						case "cancelled":
						case "failed":
						case "interrupted":
							failedState = true;
							break;
					}
					if (failedState)
					{
						break;
					}
				}
				if (finalTimeline != null)
				{
					Console.WriteLine(string.Format("Hive script step {0} created at {1}, started at {2}, finished at {3}"
						, hiveStepId, finalTimeline.CreationDateTime, finalTimeline.StartDateTime, finalTimeline.EndDateTime));
				}
				TerminateJobFlowsRequest terminateRequest = new TerminateJobFlowsRequest(new List<string> { activeWaitingJobFlowId });
				TerminateJobFlowsResponse terminateResponse = emrClient.TerminateJobFlows(terminateRequest);
			}
			else
			{
				Console.WriteLine("No valid job flow could be created.");
			}
		}
		catch (AmazonElasticMapReduceException emrException)
		{
			Console.WriteLine("Hive script execution step has failed.");
			Console.WriteLine("Amazon error code: {0}",
				string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
			Console.WriteLine("Exception message: {0}", emrException.Message);
		}
	}
}

Call the method from Main:

static void Main(string[] args)
{		

	EmrDemoService emrDemoService = new EmrDemoService();
	emrDemoService.RunHiveScriptStep();

	Console.WriteLine("Main done...");
	Console.ReadKey();
}

Here’s a sample output from the start of the program execution:

Hive query execution preparation program output NET

Here’s a sample printout from the start of the Hive script execution:

Hive script execution started NET

The step is also visible in the EMR GUI:

Hive script execution started visible in EMR GUI

The step has completed:

Hive query execution completed program output NET

Let’s check in DynamoDb if the aggregation script has actually worked. Indeed it has:

Aggregations visible in Amazon DYnamoDb after Hive query execution

In the next part we’ll apply all we have learnt about EMR to our original overall Big Data demo we started building in the series on Amazon Kinesis.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

One Response to Using Amazon Elastic MapReduce with the AWS .NET API Part 7: indirect Hive with .NET

  1. Pingback: Execute Hive Script in AWS EMR | It's Khayer, Bangladesh

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: