Big Data: using Amazon Kinesis with the AWS.NET API Part 3: sending to the stream

Introduction

In the previous post of this series we set up the Kinesis stream, installed the .NET SDK and inserted a very simple domain object into a Kinesis producer console application.

In this post we’ll start posting to our Kinesis stream.

Open the AmazonKinesisProducer demo application and let’s get to it.

Preparations

We cannot just call the services within the AWS SDK without proper authentication. This is an important reference page to handle your credentials in a safe way. We’ll the take the recommended approach and create a profile in the SDK Store and reference it from app.config.

This series is not about AWS authentication so we won’t go into temporary credentials but later on you may be interested in that option too. Since we’re programmers and it takes a single line of code to set up a profile we’ll go with the programmatic options. Add the following line to Main:

Amazon.Util.ProfileManager.RegisterProfile("demo-aws-profile", "your access key id", "your secret access key");

I suggest you remove the code from the application later on in case you want to distribute it. Run the application and it should execute without exceptions. Next open app.config and add the appSettings section with the following elements:

<appSettings>
        <add key="AWSProfileName" value="demo-aws-profile"/>
	<add key="KinesisStreamName" value="test-stream"/>
</appSettings>

Generating web transactions

We’ll create web transaction objects using the console. Add the following private methods to Program.cs:

private static List<WebTransaction> GetTransactions()
{
	List<WebTransaction> webTransactions = new List<WebTransaction>();
	Console.WriteLine("Enter your web transactions. ");
	Console.Write("URL - type 'x' and press Enter to exit: ");
	string url = Console.ReadLine();
	while (url != "x")
	{
		WebTransaction wt = new WebTransaction();
		wt.Url = url;
		wt.UtcDateUnixMs = ConvertToUnixMillis(DateTime.UtcNow);

		Console.Write("Customer name: ");
		string customerName = Console.ReadLine();
		wt.CustomerName = customerName;

		Console.Write("Response time (ms): ");
		int responseTime = Convert.ToInt32(Console.ReadLine());
		wt.ResponseTimeMs = responseTime;

		Console.Write("Web method: ");
		string method = Console.ReadLine();
		wt.WebMethod = method;

		webTransactions.Add(wt);

		Console.Write("URL - enter 'x' and press enter to exit: ");
		url = Console.ReadLine();
	}
	return webTransactions;
}

private static long ConvertToUnixMillis(DateTime dateToConvert)
{
	return Convert.ToInt64(dateToConvert.Subtract(new DateTime(1970,1,1,0,0,0,0)).TotalMilliseconds);
}

GetTransactions() is a simple loop you must have done in your C# course #2 or 3. Note that I haven’t added any validation, such as the feasibility of the web method or the response time. So be gentle and enter “correct” values later on during the tests. ConvertToUnixMillis simply converts a date to a UNIX timestamp in milliseconds. .NET4.5 doesn’t natively support UNIX dates but it’s coming in C# 6.

Sending the transactions to the stream

We’ll send each message one by one in the following method which you can add to Program.cs:

private static void SendWebTransactionsToQueue(List<WebTransaction> transactions)
{
	AmazonKinesisConfig config = new AmazonKinesisConfig();
	config.RegionEndpoint = Amazon.RegionEndpoint.EUWest1;
	AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config);
	String kinesisStreamName = ConfigurationManager.AppSettings["KinesisStreamName"];

	foreach (WebTransaction wt in transactions)
	{
		string dataAsJson = JsonConvert.SerializeObject(wt);
		byte[] dataAsBytes = Encoding.UTF8.GetBytes(dataAsJson);
		using (MemoryStream memoryStream = new MemoryStream(dataAsBytes))
		{
			try
			{						
				PutRecordRequest requestRecord = new PutRecordRequest();
				requestRecord.StreamName = kinesisStreamName;
				requestRecord.PartitionKey = "url-response-times";
				requestRecord.Data = memoryStream;

				PutRecordResponse responseRecord = kinesisClient.PutRecord(requestRecord);
				Console.WriteLine("Successfully sent record {0} to Kinesis. Sequence number: {1}", wt.Url, responseRecord.SequenceNumber);
			}
			catch (Exception ex)
			{
				Console.WriteLine("Failed to send record {0} to Kinesis. Exception: {1}", wt.Url, ex.Message);
			}
		}
	}
}

You’ll need to reference the System.Configuration library to make this work.

We first configure our access to Kinesis using the AmazonKinesisConfig object. We set the region to the one where we set up the stream. In my case it’s eu-west-1, but you may need to provide something else. We also read the stream name from app.config.

Then for each of the WebTransaction objects we go through the following process:

  • Get the JSON representation of the object
  • Convert the JSON to a byte array
  • Put byte array into a MemoryStream
  • We set up the PutRecordRequest object with the stream name, the partition key and the data we want to publish
  • The record is sent to Kinesis using the PutRecord method
  • If it’s successful then we print the sequence number of the message
  • Otherwise we print an exception message

What is a partition key? It is a key to group the data within a stream into shards. And a sequence number? It is a unique ID that each message gets upon insertion into the stream. This page with the key concepts will be a good friend of yours while working with Kinesis.

Test

We can call these functions from Main as follows:

List<WebTransaction> webTransactions = GetTransactions();
SendWebTransactionsToQueue(webTransactions);

Console.WriteLine("Main done...");
Console.ReadKey();

Start the application and create a couple of WebTransaction objects using the console. Then if all goes well you should see a printout similar to the following in the console window:

Messages sent to Kinesis stream console output

Let’s see what the Kinesis dashboard is telling us:

PutRequest count on AWS Kinesis dashboard

The PutRequest graph increased to 5 – and since I then put one more message to the stream the number decreased to 1:

PutRequest count on AWS Kinesis dashboard

In the next post we’ll see how to read the messages from the stream.

View all posts related to Amazon Web Services and Big Data here.

Advertisements

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

3 Responses to Big Data: using Amazon Kinesis with the AWS.NET API Part 3: sending to the stream

  1. jdupourque says:

    Very good work! until now everything works perfectly…

  2. Marco Lanza says:

    Hi,

    I have the Visual Studio Community 2015 and for some reason the following line of code does not compile:

    AmazonKinesisConfig config = new AmazonKinesisConfig();
    config.RegionEndpoint = Amazon.RegionEndpoint.USWest2;
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config);

    It complains the AmazonKinesisConfig “could not be found (are you missing a using directive or an assembly reference)”.

    I have the using.system.configuration in my code but still does not work. I am about to google it and see if the name were updated or something else.

    Thanks in advance.

  3. Marco Lanza says:

    I solved the problem for my previous questions.

    Thanks.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

ultimatemindsettoday

A great WordPress.com site

iReadable { }

.NET Tips & Tricks

Robin Sedlaczek's Blog

Developer on Microsoft Technologies

HarsH ReaLiTy

A Good Blog is Hard to Find

Softwarearchitektur in der Praxis

Wissenswertes zu Webentwicklung, Domain-Driven Design und Microservices

the software architecture

thoughts, ideas, diagrams,enterprise code, design pattern , solution designs

Technology Talks

on Microsoft technologies, Web, Android and others

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

Anything around ASP.NET MVC,WEB API, WCF, Entity Framework & AngularJS

Cyber Matters

Bite-size insight on Cyber Security for the not too technical.

Guru N Guns's

OneSolution To dOTnET.

Johnny Zraiby

Measuring programming progress by lines of code is like measuring aircraft building progress by weight.

%d bloggers like this: