Using Amazon Elastic MapReduce with the AWS .NET API Part 7: indirect Hive with .NET
March 9, 2015 1 Comment
Introduction
In the previous post we looked at the basics of how EMR can interact with Amazon S3 and DynamoDb using some additional commands and properties of Hive. In part 5 of this series we saw how to start a new cluster and assign a couple of steps to it using .NET. In this post we’ll marry the two: we’ll start up a new cluster and assign the full aggregation job flow to it in code.
It’s not possible to send a set of Hive commands in plain text to the cluster. The Hive script must be stored somewhere where EMR can find it. The .NET code can only point out the location. All this means that we’ll need to prepare the Hive script beforehand. An ideal place to store all types of files is of course Amazon S3 and we’ll follow that line here as well. This is the reason why I called this post “indirect Hive”.
Preparation
First delete all records from the aggregation-tests DynamoDb table from the previous post so that we won’t confuse the old results with the new ones.
Next, create a text file called hive-aggregation-script.txt and insert the following statements:
create external table if not exists raw_urls (url string, response_time int) row format delimited fields terminated by '|' location 's3://raw-urls-data/'; create external table if not exists aggregations (id string, url string, avg_response_time double) stored by 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' tblproperties("dynamodb.table.name" = "aggregation-tests", "dynamodb.region" = "eu-west-1", "dynamodb.column.mapping" = "id:id,url:url_name,avg_response_time:average_response_time"); create table if not exists temporary_local_aggregation (id string, url string, avg_response_time double); insert overwrite table temporary_local_aggregation SELECT reflect("java.util.UUID", "randomUUID"), url, avg(response_time) FROM raw_urls GROUP BY url; insert into table aggregations select * from temporary_local_aggregation; drop table aggregations; drop table raw_urls;
All of these statements are copied directly from the previous post. It’s just a step-by-step instruction to create the external tables, run the aggregations and store them in DynamoDb.
Upload hive-aggregation-script.txt to S3 in a bucket called “hive-aggregation-scripts”.
.NET
Open the EMR tester console application we started building in part 5 referenced above. We have a method called StartCluster in EmrDemoService.cs. We’ll first insert a similar method. It starts a cluster and returns the job flow id. Insert the following method in EmrDemoService:
public string StartNewActiveWaitingJobFlow() { using (IAmazonElasticMapReduce emrClient = GetEmrClient()) { try { StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1); StepConfig installHive = new StepConfig() { Name = "Install Hive step", ActionOnFailure = "TERMINATE_JOB_FLOW", HadoopJarStep = stepFactory.NewInstallHiveStep(StepFactory.HiveVersion.Hive_Latest) }; StepConfig installPig = new StepConfig() { Name = "Install Pig step", ActionOnFailure = "TERMINATE_JOB_FLOW", HadoopJarStep = stepFactory.NewInstallPigStep() }; JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig() { Ec2KeyName = "insert-your-ec2-key-name", HadoopVersion = "2.4.0", InstanceCount = 3, KeepJobFlowAliveWhenNoSteps = true, MasterInstanceType = "m3.xlarge", SlaveInstanceType = "m3.xlarge" }; RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest() { Name = "EMR cluster", Steps = { installHive, installPig }, Instances = jobFlowInstancesConfig }; RunJobFlowResponse runJobFlowResponse = emrClient.RunJobFlow(runJobFlowRequest); String jobFlowId = runJobFlowResponse.JobFlowId; Console.WriteLine(string.Format("Job flow id: {0}", jobFlowId)); bool oneStepNotFinished = true; while (oneStepNotFinished) { DescribeJobFlowsRequest describeJobFlowsRequest = new DescribeJobFlowsRequest(new List<string>() { jobFlowId }); DescribeJobFlowsResponse describeJobFlowsResponse = emrClient.DescribeJobFlows(describeJobFlowsRequest); JobFlowDetail jobFlowDetail = describeJobFlowsResponse.JobFlows.First(); List<StepDetail> stepDetailsOfJob = jobFlowDetail.Steps; List<string> stepStatuses = new List<string>(); foreach (StepDetail stepDetail in stepDetailsOfJob) { StepExecutionStatusDetail status = stepDetail.ExecutionStatusDetail; String stepConfigName = stepDetail.StepConfig.Name; String statusValue = status.State.Value; stepStatuses.Add(statusValue.ToLower()); Console.WriteLine(string.Format("Step config name: {0}, step status: {1}", stepConfigName, statusValue)); } oneStepNotFinished = stepStatuses.Contains("pending") || stepStatuses.Contains("running"); Thread.Sleep(10000); } return jobFlowId; } catch (AmazonElasticMapReduceException emrException) { Console.WriteLine("Cluster list creation has failed."); Console.WriteLine("Amazon error code: {0}", string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode); Console.WriteLine("Exception message: {0}", emrException.Message); } } return string.Empty; }
You should understand all of that from part 5 referred to above. It is just a slightly reduced form of the StartCluster method: we have no logging, no tags, no termination protection, we don’t need those right now.
So the method returns a job flow ID. We can use that ID to add new steps to be executed in the cluster. Adding extra steps is also performed by the StepFactory and StepConfig objects. StepFactory has a method called NewRunHiveScriptStep which accepts the S3 address of the Hive script you’d like to have executed.
The below method adds a step to the job flow, monitors its progress and periodically prints out the step status, such as “pending” or “running”. When the step has completed the method prints the final status to the console window. The cluster is finally terminated:
public void RunHiveScriptStep() { using (IAmazonElasticMapReduce emrClient = GetEmrClient()) { try { String activeWaitingJobFlowId = StartNewActiveWaitingJobFlow(); if (!string.IsNullOrEmpty(activeWaitingJobFlowId)) { StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1); String scriptLocation = "s3://hive-aggregation-scripts/hive-aggregation-script.txt"; StepConfig runHiveScript = new StepConfig() { Name = "Run Hive script", HadoopJarStep = stepFactory.NewRunHiveScriptStep(scriptLocation), ActionOnFailure = "TERMINATE_JOB_FLOW" }; AddJobFlowStepsRequest addHiveRequest = new AddJobFlowStepsRequest(activeWaitingJobFlowId, new List<StepConfig>(){runHiveScript}); AddJobFlowStepsResult addHiveResponse = emrClient.AddJobFlowSteps(addHiveRequest); List<string> stepIds = addHiveResponse.StepIds; String hiveStepId = stepIds[0]; DescribeStepRequest describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId }; DescribeStepResult describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest); Step hiveStep = describeHiveStepResult.Step; StepStatus hiveStepStatus = hiveStep.Status; string hiveStepState = hiveStepStatus.State.Value.ToLower(); bool failedState = false; StepTimeline finalTimeline = null; while (hiveStepState != "completed") { describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId }; describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest); hiveStep = describeHiveStepResult.Step; hiveStepStatus = hiveStep.Status; hiveStepState = hiveStepStatus.State.Value.ToLower(); finalTimeline = hiveStepStatus.Timeline; Console.WriteLine(string.Format("Current state of Hive script execution: {0}", hiveStepState)); switch (hiveStepState) { case "pending": case "running": Thread.Sleep(10000); break; case "cancelled": case "failed": case "interrupted": failedState = true; break; } if (failedState) { break; } } if (finalTimeline != null) { Console.WriteLine(string.Format("Hive script step {0} created at {1}, started at {2}, finished at {3}" , hiveStepId, finalTimeline.CreationDateTime, finalTimeline.StartDateTime, finalTimeline.EndDateTime)); } TerminateJobFlowsRequest terminateRequest = new TerminateJobFlowsRequest(new List<string> { activeWaitingJobFlowId }); TerminateJobFlowsResponse terminateResponse = emrClient.TerminateJobFlows(terminateRequest); } else { Console.WriteLine("No valid job flow could be created."); } } catch (AmazonElasticMapReduceException emrException) { Console.WriteLine("Hive script execution step has failed."); Console.WriteLine("Amazon error code: {0}", string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode); Console.WriteLine("Exception message: {0}", emrException.Message); } } }
Call the method from Main:
static void Main(string[] args) { EmrDemoService emrDemoService = new EmrDemoService(); emrDemoService.RunHiveScriptStep(); Console.WriteLine("Main done..."); Console.ReadKey(); }
Here’s a sample output from the start of the program execution:
Here’s a sample printout from the start of the Hive script execution:
The step is also visible in the EMR GUI:
The step has completed:
Let’s check in DynamoDb if the aggregation script has actually worked. Indeed it has:
In the next part we’ll apply all we have learnt about EMR to our original overall Big Data demo we started building in the series on Amazon Kinesis.
View all posts related to Amazon Web Services and Big Data here.
Pingback: Execute Hive Script in AWS EMR | It's Khayer, Bangladesh