Optimize write throughput for Amazon Kinesis Knowledge Streams

Optimize write throughput for Amazon Kinesis Knowledge Streams
Optimize write throughput for Amazon Kinesis Knowledge Streams


Amazon Kinesis Data Streams is utilized by many purchasers to seize, course of, and retailer information streams at any scale. This stage of unparalleled scale is enabled by dividing every information stream into a number of shards. Every shard in a stream has a 1 Mbps or 1,000 information per second write throughput restrict. Whether or not your information streaming utility is amassing clickstream information from an online utility or recording telemetry information from billions of Web of Issues (IoT) gadgets, streaming purposes are extremely prone to a various quantity of knowledge ingestion. Generally such a big and sudden quantity of knowledge may very well be the factor we least count on. As an illustration, contemplate utility logic with a retry mechanism when writing information to a Kinesis information stream. In case of a community failure, it’s frequent to buffer information regionally and write them when connectivity is restored. Relying on the speed that information is buffered and the period of connectivity concern, the native buffer can accumulate sufficient information that might saturate the accessible write throughput quota of a Kinesis information stream.

When an utility makes an attempt to write down extra information than what’s allowed, it is going to obtain write throughput exceeded errors. In some cases, not with the ability to tackle these errors in a well timed method can lead to information loss, sad clients, and different undesirable outcomes. On this submit, we discover the everyday causes behind write throughput exceeded errors, together with strategies to determine them. We then information you on swift responses to those occasions and supply a number of options for mitigation. Lastly, we delve into how on-demand capacity mode might be helpful in addressing these errors.

Why will we get write throughput exceeded errors?

Write throughput exceeded errors are typically attributable to three completely different eventualities:

  • The only is the case the place the producer utility is producing extra information than the throughput accessible within the Kinesis information stream (the sum of all shards).
  • Subsequent, we’ve got the case the place information distribution shouldn’t be even throughout all shards, often called sizzling shard concern.
  • Write all through errors may also be attributable to an utility selecting a partition key to write down information at a fee exceeding the throughput provided by a single shard. This example is considerably much like sizzling shard concern, however as we see later on this submit, in contrast to a sizzling shard concern, you may’t resolve this downside by including extra shards to the information stream. This conduct is usually often called a sizzling key concern.

Earlier than we talk about the right way to diagnose these points, let’s take a look at how Kinesis information streams manage information and its relationship to write down throughput exceeded errors.

A Kinesis information stream has a number of shards to retailer information. Every shard is assigned a key vary in 128-bit integer space. In case you view the small print of a knowledge stream utilizing the describe-stream operation within the AWS Command Line Interface (AWS CLI), you may really see this key vary project:

$ aws kinesis describe-stream --stream-name my-data-stream
"StreamDescription": {
  "Shards": [
    {
      "ShardId": "shardId-000000000000",
      "HashKeyRange": {
        "StartingHashKey": "0",
        "EndingHashKey": 
        "85070591730234615865843651857942052863"
       }
    },
    {
       "ShardId": "shardId-000000000001",
       "HashKeyRange": {
       "StartingHashKey": 
          "85070591730234615865843651857942052864",
       "EndingHashKey": 
         "170141183460469231731687303715884105727"
       }
    }
  ]
}

When a producer utility invokes the PutRecord or PutRecords API, the service calculates a MD5 hash for the PartitionKey specified within the report. The ensuing hash is used to find out which shard to retailer that report. You’ll be able to take extra management over this course of by setting the ExplicitHashKey property within the PutRecord request to a hash key that falls inside a particular shard’s key vary. As an illustration, setting ExplicitHashKey to 0 will assure that report is written to shard ID shardId-0 within the stream described within the previous code snippet.

How partition keys are distributed throughout accessible shards performs an important position in maximizing the accessible throughput in a Kinesis information stream. When the partition key getting used is repeated continuously in a approach that some keys are extra frequent than the others, shards storing these information will likely be utilized extra. We additionally get the identical internet impact if we use ExplicitHashKey and our logic for selecting the hash secret is biased in direction of a subset of shards.

Think about you’ve a fleet of net servers logging efficiency metrics for every net request served right into a Kinesis information stream with two shards and also you used a request URL because the partition key. Every time a request is served, the applying makes a name to the PutRecord API carrying a 10-bytes report. Let’s say that you’ve got a complete of 10 URLs and every receives 10 requests per second. Beneath these circumstances, whole throughput required for the workload is 1,000 bytes per second and 100 requests per second. If we assume excellent distribution of 10 URLs throughout the 2 shards, every shard will obtain 500 bytes per second and 50 requests per second.

Now think about considered one of these URLs went viral and it began receiving 1,000 requests per second. Though the scenario is constructive from a enterprise standpoint, you’re now on the point of making customers sad. After the web page gained reputation, you’re now counting 1,040 requests per second for the shard storing the favored URL (1000 + 10 * 4). At this level, you’ll obtain write throughput exceeded errors from that shard. You’re throttled based mostly on the requests per second quota as a result of even with elevated requests, you’re nonetheless producing roughly 11 KB of knowledge.

You’ll be able to resolve this downside both by utilizing a UUID for every request because the partition key so that you simply share the overall load throughout each shards, or by including extra shards to the Kinesis information stream. The strategy you select will depend on the way you need to devour information. Altering the partition key to a UUID could be problematic if you would like efficiency metrics from a given URL to be all the time processed by the identical shopper occasion or if you wish to keep the order of information on a per-URL foundation.

Realizing the precise reason behind write all through exceeded errors is a crucial step in remediating them. Within the subsequent sections, we talk about the right way to determine the basis trigger and remediate this downside.

Figuring out the reason for write throughput exceeded errors

Step one in fixing an issue is that understanding that it exists. You should use the WriteProvisionedThrougputExceeded metric in Amazon CloudWatch on this case. You’ll be able to correlate the spikes within the WriteProvisionedThrougputExceeded metric to the IncomingBytes and IncomingRecords metrics to determine whether or not an utility is getting throttled as a result of dimension of knowledge or the variety of information written.

Let’s take a look at a couple of assessments we carried out in a stream with two shards as an example numerous eventualities. On this occasion, with two shards in our stream, whole throughput accessible to our producer utility is both 2 Mbps or 2,000 information per second.

Within the first check, we ran a producer to write down batches of 30 information, every being 100 KB, utilizing the PutRecords API. As you may see within the graph on the left of the next determine, our WriteProvisionedThroughputExceedded errors rely went up. The graph on the correct exhibits that we’re reaching the two Mbps restrict, however our incoming information fee is way decrease than the two,000 information per second restrict (Kinesis metrics are revealed at 1-minute intervals, therefore 125.8 and 120,000 as higher limits).Record size based throttling example

The next figures present how the identical three metrics modified after we modified the producer to write down batches of 500 information, every being 50 bytes, within the second check. This time, we exceeded the two,000 information per second throughput restrict, however our incoming bytes fee is nicely beneath the restrict.

Record count based throttling

Now that we all know that downside exists, we should always search for clues to see if we’re exceeding the general throughput accessible within the stream or if we’re having a sizzling shard concern attributable to an imbalanced partition key distribution as mentioned earlier. One method to that is to make use of enhanced shard-level metrics. Previous to our assessments, we enabled enhanced shard-level metrics, and we are able to see within the following determine that each shards equally reached their quota in our first check.

Enhanced shard level metrics

We’ve got seen Kinesis information streams containing hundreds of shards harnessing the ability of infinite scale in Kinesis information streams. Nonetheless, plotting enhanced shard-level metrics on a such giant stream might not present a simple to technique to discover out which shards are over-utilized. In that occasion, it’s higher to make use of CloudWatch Metrics Insights to run queries to view top-n objects, as proven within the following code (modify the LIMIT 5 clause accordingly):

-- Present high 5 shards with highest incoming bytes
SELECT
SUM(IncomingBytes)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

-- Present high 5 shards with highest incoming information
SELECT
SUM(IncomingRecords)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

Enhanced shard-level metrics aren’t enabled by default. In case you didn’t allow them and also you need to carry out root trigger evaluation after an incident, this feature isn’t very useful. As well as, you may solely question the most recent 3 hours of data. Enhanced shard-level metrics additionally incur extra prices for CloudWatch metrics and it might be price prohibitive to have it all the time on in information streams with plenty of shards.

One attention-grabbing situation is when the workload is bursty, which may make the ensuing CloudWatch metrics graphs relatively baffling. It is because Kinesis publishes CloudWatch metric information aggregated at 1-minute intervals. Consequently, though you may see write throughput exceeded errors, your incoming bytes/information graphs could also be nonetheless throughout the limits. For instance this situation, we modified our check to create a burst of writes exceeding the bounds after which sleep for a couple of seconds. Then we repeated this cycle for a number of minutes to yield the graphs within the following determine, which present write throughput exceeded errors on the left, however the IncomingBytes and IncomingRecords graphs on the correct appear positive.

Effect of one data aggregated at 1-minute intervals

To reinforce the method of figuring out write throughput exceeded errors, we developed a CLI software known as Kinesis Hot Shard Advisor (KHS). With KHS, you may view shard utilization when shard-level metrics aren’t enabled. That is notably helpful for investigating a difficulty retrospectively. It might probably additionally present most continuously written keys to a specific shard. KHS experiences shard utilization by studying information and aggregating them per second intervals based mostly on the ApproximateArrivalTimestamp within the report. Due to this, you may as well perceive shard utilization drivers throughout bursty write workloads.

By operating the next command, we are able to get KHS to examine the information that arrived in 1 minute throughout our first check and generate a report:

khs -stream my-data-stream -from "2023-06-22 17:35:00" -to "2023-06-22 17:36:00"

For the given time window, the abstract part within the generated report exhibits the utmost bytes per second fee noticed, whole bytes ingested, most information per second noticed, and the overall variety of information ingested for every shard.

KHS report summary

Selecting a shard ID within the first column will show a graph of incoming bytes and information for that shard. That is much like the graph you get in CloudWatch metrics, besides the KHS graph experiences on a per-second foundation. As an illustration, within the following determine, we are able to see how the producer was going by a sequence of bursty writes adopted by a throttling occasion throughout our check case.

KHS shard level metrics display

Working the identical command with the -aggregate-key choice permits partition key distribution evaluation. It generates a further graph for every shard displaying the important thing distribution, as proven within the following determine. For our check situation, we are able to solely see every key getting used one time as a result of we used a brand new UUID for every report.

KHS key distribution graph

As a result of KHS experiences based mostly on information saved in streams, it creates an enhanced fan-out consumer at startup to forestall utilizing the learn throughput quota accessible for different shoppers. When the evaluation is full, it deletes that enhanced fan-out shopper.

Due its nature of studying information streams, KHS can switch plenty of information throughout evaluation. As an illustration, assume you’ve a stream with 100 shards. If all of them are totally utilized throughout a minute window specified utilizing -from and -to arguments, the host operating KHS will obtain not less than 1 MB * 100 * 60 = 6000 MB = roughly 6 GB information. To keep away from this sort of extreme information switch and velocity up the evaluation course of, we advocate first utilizing the WriteProvisionedThroughoutExceeded CloudWatch metric to determine a time interval once you skilled throttling and use a small window (akin to 10 seconds) with KHS. You may also run KHS in an Amazon Elastic Compute Cloud (Amazon EC2) occasion in the identical AWS Area as your Kinesis information stream to attenuate community latency throughout reads.

KHS is designed to run in a single machine to diagnose large-scale workloads. Utilizing a naive in-memory-based counting algorithm (akin to a hash map storing the partition key and rely) for partition key distribution evaluation may simply exhaust the accessible reminiscence within the host system. Subsequently, we use a probabilistic information construction known as count-min-sketch to estimate the variety of instances a key has been used. Consequently, the quantity you see within the report ought to be taken as an approximate worth relatively than an absolute worth. In any case, with this report, we simply need to discover out if there’s an imbalance within the keys written to a shard.

Now that we perceive what causes sizzling shards and the right way to determine them, let’s take a look at the right way to take care of this in producer purposes and remediation steps.

Remediation steps

Having producers retry writes is a step in direction of making our producers resilient to write down throughput exceeded errors. Take into account our earlier pattern utility logging efficiency metrics information for every net request served by a fleet of net servers. When implementing this retry mechanism, you need to do not forget that information that aren’t written to the Kinesis stream are going to be in host system’s reminiscence. The primary concern with that is, if the host crashes earlier than the information may very well be written, you’ll expertise information loss. Eventualities akin to monitoring net request efficiency information is likely to be extra forgiving for any such information loss than eventualities like monetary transactions. You need to consider sturdiness ensures required on your utility and make use of strategies to attain them.

The second concern is that information ready to be written to the Kinesis information stream are going to devour the host system’s reminiscence. Whenever you begin getting throttled and have some retry logic in place, you need to discover that your reminiscence utilization goes up. A retry mechanism ought to have a technique to keep away from exhausting the host system’s reminiscence.

With the suitable retry logic in place, if you happen to obtain write throughput exceeded errors, you need to use the strategies we mentioned earlier to determine the trigger. After you determine the basis trigger, you may select the suitable remediation step:

  • If the producer utility is exceeding the general stream’s throughput, you may add extra shards to the stream to extend its write throughput capability. When including shards, the Kinesis information stream makes the brand new shards accessible incrementally, minimizing the time that producers expertise write throughput exceeded errors. So as to add shards to a stream, you need to use the Kinesis console, the update-shard-count operation within the AWS CLI, the UpdateShardCount API by the AWS SDK, or the ShardCount property within the AWS CloudFormation template used to create the stream.
  • If the producer utility is exceeding the throughput restrict of some shards (sizzling shard concern), choose one of many following choices based mostly on shopper necessities:
    • If locality of knowledge is required (information with the identical partition key are all the time processed by the identical shopper) or an order based mostly on partition secret is required, use the split-shard operation within the AWS CLI or the SplitShard API within the AWS SDK to separate these shards.
    • If locality or order based mostly on the present partition key shouldn’t be required, change the partition key scheme to extend its distribution.
  • If the producer utility is exceeding the throughput restrict of a shard attributable to a single partition key (sizzling key concern), change the partition key scheme to extend its distribution.

Kinesis Knowledge Streams additionally has an on-demand capacity mode. In on-demand capability mode, Kinesis Knowledge Streams mechanically scales streams when wanted. Moreover, you may switch between on-demand and provisioned capacity modes with out inflicting an outage. This may very well be notably helpful once you’re experiencing write throughput exceeded errors however require quick response to maintain your utility accessible to your customers. In such cases, you may change a provisioned capability mode information stream to an on-demand information stream and let Kinesis Knowledge Streams deal with the required scale appropriately. You’ll be able to then carry out root trigger evaluation within the background and take corrective actions. Lastly, if essential, you may change the capability mode again to provisioned.

Conclusion

You need to now have a strong understanding of the frequent causes of write throughput exceeded errors in Kinesis information streams, the right way to diagnose them, and what actions to take to appropriately take care of them. We hope that this submit will provide help to make your Kinesis Knowledge Streams purposes extra strong. If you’re simply beginning with Kinesis Knowledge Streams, we advocate referring to the Developer Guide.

If in case you have any questions or suggestions, please depart them within the feedback part.


Concerning the Authors

Buddhike de Silva is a Senior Specialist Options Architect at Amazon Internet Companies. Buddhike helps clients run giant scale streaming analytics workloads on AWS and make the perfect out of their cloud journey.

Nihar Sheth is a Senior Product Supervisor at Amazon Internet Companies. He’s captivated with growing intuitive product experiences that resolve advanced buyer issues and allow clients to attain their enterprise objectives.

Leave a Reply

Your email address will not be published. Required fields are marked *