Using Amazon RedShift with the AWS .NET API Part 5: connecting to master node using ODBC

Introduction

In the previous post we went through some basic C# code to communicate with Amazon RedShift. We saw how to get a list of clusters, start a new cluster and terminate one using the .NET AWS SDK.

We haven’t yet seen how to execute Postgresql commands on RedShift remotely from code. That is the main goal of this post.

Installing the ODBC driver

In this section we’ll prepare our Windows environment to be able to connect to RedShift using ODBC. At times this can be a frustrating experience so I’ll try to give you as much detail as I can.

Read more of this post

Advertisements

Using Amazon RedShift with the AWS .NET API Part 4: code beginnings

Introduction

In the previous post we looked into how to connect to the Amazon RedShift master node using a tool called WorkBenchJ. We also went through some very basic Postgresql statements and tested an equally basic aggregation script.

In this post we’ll install the .NET SDK and start building some test code.

Note that we’ll be concentrating on showing and explaining the technical code examples related to AWS. We’ll ignore software principles like SOLID and layering so that we can stay focused. It’s your responsibility to organise your code properly. There are numerous posts on this blog that take up topics related to software architecture.

Installing the SDK

Read more of this post

Using Amazon RedShift with the AWS .NET API Part 3: connecting to the master node

Introduction

In the previous post of this series we quickly looked at what a massively parallel processing database is. We also launched our first Amazon RedShift cluster.

In this post we’ll connect to the master node and start issuing Postgresql commands.

If you don’t have any RedShift cluster available at this point then you can follow the steps in the previous post so that you can try the example code.

Connecting to RedShift

Read more of this post

Using Amazon RedShift with the AWS .NET API Part 2: MPP definition and first cluster

Introduction

In the previous post we went through Amazon RedShift at an introductory level. In general we can say that it is a highly efficient data storage and data mining tool especially suited for Big Data scenarios. However, it also comes with serious limitations regarding the available Postgresql language features.

In this post we’ll first summarise an important term in conjunction with RedShift: MPP. We’ll then go on and create our first database in the RedShift GUI.

Read more of this post

Using Amazon RedShift with the AWS .NET API Part 1: introduction

Introduction

Amazon RedShift is Amazon’s data warehousing solution and is especially well-suited for Big Data scenarios where petabytes of data must be stored and analysed. It follows a columnar DBMS architecture and it was designed especially for heavy data mining requests.

This is the fifth – and probably for a while the last – installment of a series dedicated to out-of-the-box components built and powered by Amazon Web Services (AWS) enabling Big Data handling. The components we have looked at so far are the following:

Read more of this post

Using Amazon Elastic MapReduce with the AWS .NET API Part 8: connection to our Big Data demo

Introduction

In the previous post we saw how to start an Amazon EMR cluster and have it execute a Hive script which performs a basic aggregation step.

This post will take up the Big Data thread where we left off at the end of the previous 2 series on the blob storage component Amazon S3 and the NoSql data store DynamoDb. Therefore the pre-requisite of following the code examples in this post is familiarity with what we discussed in those topics.

Read more of this post

Using Amazon Elastic MapReduce with the AWS .NET API Part 7: indirect Hive with .NET

Introduction

In the previous post we looked at the basics of how EMR can interact with Amazon S3 and DynamoDb using some additional commands and properties of Hive. In part 5 of this series we saw how to start a new cluster and assign a couple of steps to it using .NET. In this post we’ll marry the two: we’ll start up a new cluster and assign the full aggregation job flow to it in code.

It’s not possible to send a set of Hive commands in plain text to the cluster. The Hive script must be stored somewhere where EMR can find it. The .NET code can only point out the location. All this means that we’ll need to prepare the Hive script beforehand. An ideal place to store all types of files is of course Amazon S3 and we’ll follow that line here as well. This is the reason why I called this post “indirect Hive”.

Preparation

First delete all records from the aggregation-tests DynamoDb table from the previous post so that we won’t confuse the old results with the new ones.

Next, create a text file called hive-aggregation-script.txt and insert the following statements:

create external table if not exists raw_urls (url string, response_time int) row format delimited fields terminated by '|' location 's3://raw-urls-data/';
create external table if not exists aggregations (id string, url string, avg_response_time double) stored by 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' tblproperties("dynamodb.table.name" = "aggregation-tests", "dynamodb.region" = "eu-west-1", "dynamodb.column.mapping" = "id:id,url:url_name,avg_response_time:average_response_time");
create table if not exists temporary_local_aggregation (id string, url string, avg_response_time double);
insert overwrite table temporary_local_aggregation SELECT reflect("java.util.UUID", "randomUUID"), url, avg(response_time) FROM raw_urls GROUP BY url;
insert into table aggregations select * from temporary_local_aggregation;
drop table aggregations;
drop table raw_urls;

All of these statements are copied directly from the previous post. It’s just a step-by-step instruction to create the external tables, run the aggregations and store them in DynamoDb.

Upload hive-aggregation-script.txt to S3 in a bucket called “hive-aggregation-scripts”.

.NET

Open the EMR tester console application we started building in part 5 referenced above. We have a method called StartCluster in EmrDemoService.cs. We’ll first insert a similar method. It starts a cluster and returns the job flow id. Insert the following method in EmrDemoService:

public string StartNewActiveWaitingJobFlow()
{
	using (IAmazonElasticMapReduce emrClient = GetEmrClient())
	{
		try
		{
			StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);

			StepConfig installHive = new StepConfig()
			{
				Name = "Install Hive step",
				ActionOnFailure = "TERMINATE_JOB_FLOW",
				HadoopJarStep = stepFactory.NewInstallHiveStep(StepFactory.HiveVersion.Hive_Latest)
			};

			StepConfig installPig = new StepConfig()
			{
				Name = "Install Pig step",
				ActionOnFailure = "TERMINATE_JOB_FLOW",
				HadoopJarStep = stepFactory.NewInstallPigStep()
			};

			JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig()
			{
				Ec2KeyName = "insert-your-ec2-key-name",
				HadoopVersion = "2.4.0",
				InstanceCount = 3,
				KeepJobFlowAliveWhenNoSteps = true,
				MasterInstanceType = "m3.xlarge",
				SlaveInstanceType = "m3.xlarge"
			};

			RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest()
			{
				Name = "EMR cluster",
				Steps = { installHive, installPig },
				Instances = jobFlowInstancesConfig
			};

			RunJobFlowResponse runJobFlowResponse = emrClient.RunJobFlow(runJobFlowRequest);
			String jobFlowId = runJobFlowResponse.JobFlowId;
			Console.WriteLine(string.Format("Job flow id: {0}", jobFlowId));
			bool oneStepNotFinished = true;
			while (oneStepNotFinished)
			{
				DescribeJobFlowsRequest describeJobFlowsRequest = new DescribeJobFlowsRequest(new List<string>() { jobFlowId });
				DescribeJobFlowsResponse describeJobFlowsResponse = emrClient.DescribeJobFlows(describeJobFlowsRequest);
				JobFlowDetail jobFlowDetail = describeJobFlowsResponse.JobFlows.First();
				List<StepDetail> stepDetailsOfJob = jobFlowDetail.Steps;
				List<string> stepStatuses = new List<string>();
				foreach (StepDetail stepDetail in stepDetailsOfJob)
				{
					StepExecutionStatusDetail status = stepDetail.ExecutionStatusDetail;
					String stepConfigName = stepDetail.StepConfig.Name;
					String statusValue = status.State.Value;
					stepStatuses.Add(statusValue.ToLower());
					Console.WriteLine(string.Format("Step config name: {0}, step status: {1}", stepConfigName, statusValue));
				}
				oneStepNotFinished = stepStatuses.Contains("pending") || stepStatuses.Contains("running");
				Thread.Sleep(10000);
			}
			return jobFlowId;
		}
		catch (AmazonElasticMapReduceException emrException)
		{
			Console.WriteLine("Cluster list creation has failed.");
			Console.WriteLine("Amazon error code: {0}",
				string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
			Console.WriteLine("Exception message: {0}", emrException.Message);
		}
	}

	return string.Empty;
}

You should understand all of that from part 5 referred to above. It is just a slightly reduced form of the StartCluster method: we have no logging, no tags, no termination protection, we don’t need those right now.

So the method returns a job flow ID. We can use that ID to add new steps to be executed in the cluster. Adding extra steps is also performed by the StepFactory and StepConfig objects. StepFactory has a method called NewRunHiveScriptStep which accepts the S3 address of the Hive script you’d like to have executed.

The below method adds a step to the job flow, monitors its progress and periodically prints out the step status, such as “pending” or “running”. When the step has completed the method prints the final status to the console window. The cluster is finally terminated:

public void RunHiveScriptStep()
{
	using (IAmazonElasticMapReduce emrClient = GetEmrClient())
	{
		try
		{
			String activeWaitingJobFlowId = StartNewActiveWaitingJobFlow();
			if (!string.IsNullOrEmpty(activeWaitingJobFlowId))
			{
				StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);
				String scriptLocation = "s3://hive-aggregation-scripts/hive-aggregation-script.txt";
				StepConfig runHiveScript = new StepConfig()
				{
					Name = "Run Hive script",
					HadoopJarStep = stepFactory.NewRunHiveScriptStep(scriptLocation),
					ActionOnFailure = "TERMINATE_JOB_FLOW"
				};
				AddJobFlowStepsRequest addHiveRequest = new AddJobFlowStepsRequest(activeWaitingJobFlowId, new List<StepConfig>(){runHiveScript});
				AddJobFlowStepsResult addHiveResponse = emrClient.AddJobFlowSteps(addHiveRequest);
				List<string> stepIds = addHiveResponse.StepIds;
				String hiveStepId = stepIds[0];

				DescribeStepRequest describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
				DescribeStepResult describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
				Step hiveStep = describeHiveStepResult.Step;
				StepStatus hiveStepStatus = hiveStep.Status;
				string hiveStepState = hiveStepStatus.State.Value.ToLower();
				bool failedState = false;
				StepTimeline finalTimeline = null;
				while (hiveStepState != "completed")
				{
					describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
					describeHiveStepResult = emrClient.DescribeStep(describeHiveStepRequest);
					hiveStep = describeHiveStepResult.Step;
					hiveStepStatus = hiveStep.Status;
					hiveStepState = hiveStepStatus.State.Value.ToLower();
					finalTimeline = hiveStepStatus.Timeline;
					Console.WriteLine(string.Format("Current state of Hive script execution: {0}", hiveStepState));
					switch (hiveStepState)
					{
						case "pending":
						case "running":
							Thread.Sleep(10000);
							break;
						case "cancelled":
						case "failed":
						case "interrupted":
							failedState = true;
							break;
					}
					if (failedState)
					{
						break;
					}
				}
				if (finalTimeline != null)
				{
					Console.WriteLine(string.Format("Hive script step {0} created at {1}, started at {2}, finished at {3}"
						, hiveStepId, finalTimeline.CreationDateTime, finalTimeline.StartDateTime, finalTimeline.EndDateTime));
				}
				TerminateJobFlowsRequest terminateRequest = new TerminateJobFlowsRequest(new List<string> { activeWaitingJobFlowId });
				TerminateJobFlowsResponse terminateResponse = emrClient.TerminateJobFlows(terminateRequest);
			}
			else
			{
				Console.WriteLine("No valid job flow could be created.");
			}
		}
		catch (AmazonElasticMapReduceException emrException)
		{
			Console.WriteLine("Hive script execution step has failed.");
			Console.WriteLine("Amazon error code: {0}",
				string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
			Console.WriteLine("Exception message: {0}", emrException.Message);
		}
	}
}

Call the method from Main:

static void Main(string[] args)
{		

	EmrDemoService emrDemoService = new EmrDemoService();
	emrDemoService.RunHiveScriptStep();

	Console.WriteLine("Main done...");
	Console.ReadKey();
}

Here’s a sample output from the start of the program execution:

Hive query execution preparation program output NET

Here’s a sample printout from the start of the Hive script execution:

Hive script execution started NET

The step is also visible in the EMR GUI:

Hive script execution started visible in EMR GUI

The step has completed:

Hive query execution completed program output NET

Let’s check in DynamoDb if the aggregation script has actually worked. Indeed it has:

Aggregations visible in Amazon DYnamoDb after Hive query execution

In the next part we’ll apply all we have learnt about EMR to our original overall Big Data demo we started building in the series on Amazon Kinesis.

View all posts related to Amazon Web Services and Big Data here.

ultimatemindsettoday

A great WordPress.com site

Elliot Balynn's Blog

A directory of wonderful thoughts

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT BEST PRACTICES WITH MICROSOFT STACK & ANGULAR

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: