Big Data: using Amazon Kinesis with the AWS.NET API Part 4: reading from the stream

Introduction

In the previous post of this series on Amazon Kinesis we looked at how to publish messages to a Kinesis stream. In this post we’ll see how to extract them. We’ll create a Kinesis Client application.

It’s necessary to extract the messages from the stream as it only stores them for 24 hours. Also, a client application can filter, sort and validate the incoming messages according to some pre-defined rules.

Our demo client will be a completely separate application. We’ll see some duplication of code but that has a good reason. We’ll want to simulate a scenario where the producers are completely different applications, such as a bit of JavaScript on a web page, a Java web service, an iOS app or some other smart device. Our Kinesis producer is good for demo purposes but in reality the producer can be any software that can send HTTP requests. However, if both your producer and client apps are of the same platform then of course go ahead and introduce a common layer in the project.

Open the demo app we’ve been working on and let’s get to it.

The Kinesis client

Add a new C# console application called AmazonKinesisConsumer. Add the same NuGet packages as before:

AWS SDK NuGet package

Json.NET NuGet package

Add a reference to the System.Configuration library already now. Also, add the same configurations to app.config:

<appSettings>
        <add key="AWSProfileName" value="demo-aws-profile"/>
	<add key="KinesisStreamName" value="test-stream"/>
</appSettings>

Insert the same WebTransaction object again:

public class WebTransaction
{
	public long UtcDateUnixMs { get; set; }
	public string CustomerName { get; set; }
	public string Url { get; set; }
	public string WebMethod { get; set; }
	public int ResponseTimeMs { get; set; }
}

We’ll make it easy for us here and re-use the same WebTransaction object as we know that we’ll be able to parse the incoming JSON string. However, as mentioned in the first post of this series, be prepared for different message formats and property names. If you can, always aim for some well accepted standard such as JSON or XML, they are easy to handle in code. E.g. if the incoming JSON has different names – including variations in casing – then you can use the JSON library to match the property names:

public class WebTransaction
{
	[JsonProperty(PropertyName="dateUtc")]
	public long UtcDateUnixMs { get; set; }
	[JsonProperty(PropertyName = "cust")]
	public string CustomerName { get; set; }
	[JsonProperty(PropertyName = "url")]
	public string Url { get; set; }
	[JsonProperty(PropertyName = "method")]
	public string WebMethod { get; set; }
	[JsonProperty(PropertyName = "responseTime")]
	public int ResponseTimeMs { get; set; }
}

In any case you can assume that the messages will come in as strings – or bytes that can be converted to strings to be exact.

Do not assume anything about the ordering of the messages. Messages in Kinesis are handled in parallel and they will be extracted in batches by a Kinesis client. So for best performance and consistency aim for short, independent and self-contained messages. If ordering matters or if the total message is too large for Kinesis then you can send extra properties with the messages such as “Index” and “Total” to indicate the order like “1 of 10”, “2 of 10” etc. so that the client can collect and sort them.

The shard iterator

Insert the following private method to Program.cs:

private static void ReadFromStream()
{
	AmazonKinesisConfig config = new AmazonKinesisConfig();
	config.RegionEndpoint = Amazon.RegionEndpoint.EUWest1;
	AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config);
	String kinesisStreamName = ConfigurationManager.AppSettings["KinesisStreamName"];

	DescribeStreamRequest describeRequest = new DescribeStreamRequest();
	describeRequest.StreamName = kinesisStreamName;

	DescribeStreamResponse describeResponse = kinesisClient.DescribeStream(describeRequest);
	List<Shard> shards = describeResponse.StreamDescription.Shards;

	foreach (Shard shard in shards)
	{
		GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
		iteratorRequest.StreamName = kinesisStreamName;
		iteratorRequest.ShardId = shard.ShardId;
		iteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON;

		GetShardIteratorResponse iteratorResponse = kinesisClient.GetShardIterator(iteratorRequest);
		string iteratorId = iteratorResponse.ShardIterator;

		while (!string.IsNullOrEmpty(iteratorId))
		{
			GetRecordsRequest getRequest = new GetRecordsRequest();
			getRequest.Limit = 1000;
			getRequest.ShardIterator = iteratorId;

			GetRecordsResponse getResponse = kinesisClient.GetRecords(getRequest);
			string nextIterator = getResponse.NextShardIterator;
			List<Record> records = getResponse.Records;

			if (records.Count > 0)
			{
				Console.WriteLine("Received {0} records. ", records.Count);
				foreach (Record record in records)
				{
					string json = Encoding.UTF8.GetString(record.Data.ToArray());
					Console.WriteLine("Json string: " + json);
				}
			}
			iteratorId = nextIterator;
		}
	}
}

Let’s see what’s going on here. The first 4 lines are identical to what we had in the Kinesis producer: we simply configure the access to Kinesis. We use the Kinesis client object to describe the Kinesis stream referred to by its name in the DescribeStreamRequest object. We then extract the available shards in the stream.

We then iterate through the shards. For each shard – we have only one – we need to request a shard iterator. A shard iterator will help us iterate through the messages in the shard. We specify where we want to start using the ShardIteratorType enumeration. TRIM_HORIZON means that we want to start with the oldest message first and work our way up from there. This is like a first-in-first-out collection and is probably the most common way to extract the messages. Other enumeration values are the following:

  • AT_SEQUENCE_NUMBER: read from the position indicated by a sequence number
  • AFTER_SEQUENCE_NUMBER: start right after the sequence number
  • LATEST: always read the most recent data in the shard

If you recall from the previous post a sequence number is an ID attached to each message.

Once we get the iterator we extract its ID which is used in the GetRecordsRequest object. Note that we enter a while loop and check if the iterator ID is null or empty. The GetRecordsResponse will also include an iterator ID which is a handle to read any subsequent messages. This will normally be an endless loop allowing us to always listen to messages from the stream. If there are any records returned by the iterator we print the number of records and the pure string data of each record. We expect to see some JSON messages. We don’t yet parse them to our WebTransaction messages, we’ll continue with processing the raw data in the next post.

Call this method from Main:

static void Main(string[] args)
{
	ReadFromStream();

	Console.WriteLine("Main done...");
	Console.ReadKey();
}

Test

Let’s see this in action. Make AmazonKinesisCustomer the start-up project of the solution and start the application. If you followed the previous post of this series within 24 hours of completing this post then you should see the messages you sent to the Kinesis stream before – recall that Kinesis keeps the messages for 24 hours. I can see the following JSON messages:

Messages extracted from Kinesis

Keep the application running. You’ll see that the loop just continues to run and the application doesn’t stop – we’re effectively waiting for new messages from the sream. Back in VS right-click AmazonKinesisProducer, select Debug, Start new instance. You’ll have two console windows up and running:

Kinesis producer and client running in parallel

Enter a couple of new web transactions into the producer and send it to Kinesis. The client should fetch them in a couple of seconds:

New records extracted from Kinesis

Great, we have now a highly efficient cloud-based message handler in form of Amazon Kinesis, a Kinesis client and a Kinesis producer. We’ve also seen that although the stream is located in the cloud, the producers and clients can be virtually any platforms that are able to handle HTTP messages. Therefore don’t get bogged down by the thought that you have to use Amazon components with Kinesis.

In the next post we’ll add some validation to the incoming messages.

View all posts related to Amazon Web Services and Big Data here.

Advertisement

About Andras Nemes
I'm a .NET/Java developer living and working in Stockholm, Sweden.

7 Responses to Big Data: using Amazon Kinesis with the AWS.NET API Part 4: reading from the stream

  1. Luke Sigler says:

    What is the best practice for scaling the shard iterators? Is there an equivalent to EventProcessorHost for Azure EventHubs in the AWS.NET Api?

    • Andras Nemes says:

      Hi Luke,

      Do you mean how we can have several Kinesis client apps among which Kinesis can distribute the messages? I’m not actually sure. However, we have good contact with Amazon, I’ll send a question to our contact tomorrow and let you know once I have something.
      I’m not familiar with Azure so I cannot command on that.
      //Andras

      • lukesigler says:

        Thank you. I appreciate it.

      • Andras Nemes says:

        OK, I’ve got a response.

        Just some background information: we’re developing our message processing app using the Java SDK, not .NET. The Java SDK has something called a Kinesis Client Library (KCL) which is not available in .NET. Our own solution is based on that technology. So the response is also based on that, hopefully it’s still helpful in your case.

        There seems to be no easy way to duplicate the listeners and have them listen to the same shard. The default behaviour is that if you have 2 listeners then both will get the same messages. This is true of the example in the post – if AmazonKinesisConsumer is started twice then both instances will get the messages, there’s no automatic load distribution. You can have 2 or more different shards in the stream and have each instance listen to each. You could potentially post the same type of messages to each shard but then it is the responsibility of the message producer to send the messages to the shards in a balanced way. If your Kinesis client cannot handle the traffic from a single shard then a direct solution is to deploy it on a larger EC2 or Beanstalk instance. It is in general problematic to set up 2 listeners for the same shard and this is not the recommended approach even with the Java SDK. Though I cannot give you any details on what problems may arise.

        //Andras

      • lukesigler says:

        Thank you. That makes more sense. The line: foreach (Shard shard in shards)
        was throwing me off.

  2. Steve says:

    If you had multiple shards, how would that work? The inner while (!string.IsNullOrEmpty(iteratorId)) would run eternally unless the first shard is closed, meaning the second shard would never be read?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Elliot Balynn's Blog

A directory of wonderful thoughts

Software Engineering

Web development

Disparate Opinions

Various tidbits

chsakell's Blog

WEB APPLICATION DEVELOPMENT TUTORIALS WITH OPEN-SOURCE PROJECTS

Once Upon a Camayoc

Bite-size insight on Cyber Security for the not too technical.

%d bloggers like this: