Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions,
see Getting Started with Amazon Web Services in China
(PDF).
Configuring partial batch response with Kinesis Data Streams and Lambda
When consuming and processing streaming data from an event source, by default Lambda checkpoints to the highest
sequence number of a batch only when the batch is a complete success. Lambda treats all other results as a complete
failure and retries processing the batch up to the retry limit. To allow for partial successes while processing
batches from a stream, turn on ReportBatchItemFailures
. Allowing partial successes can help to reduce
the number of retries on a record, though it doesn’t entirely prevent the possibility of retries in a successful record.
To turn on ReportBatchItemFailures
, include the enum value
ReportBatchItemFailures
in the FunctionResponseTypes list. This list indicates
which response types are enabled for your function. You can configure this list when you create or update an event source mapping.
Even when your function code returns partial batch failure responses, these responses will not be processed by Lambda unless the
ReportBatchItemFailures
feature is explicitly turned on for your event source mapping.
Report syntax
When configuring reporting on batch item failures, the StreamsEventResponse
class is returned with a
list of batch item failures. You can use a StreamsEventResponse
object to return the sequence number
of the first failed record in the batch. You can also create your own custom class using the correct response
syntax. The following JSON structure shows the required response syntax:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
If the batchItemFailures
array contains multiple items, Lambda uses the record with the lowest
sequence number as the checkpoint. Lambda then retries all records starting from that checkpoint.
Success and failure conditions
Lambda treats a batch as a complete success if you return any of the following:
Lambda treats a batch as a complete failure if you return any of the following:
Lambda retries failures based on your retry strategy.
Bisecting a batch
If your invocation fails and BisectBatchOnFunctionError
is turned on, the batch is bisected
regardless of your ReportBatchItemFailures
setting.
When a partial batch success response is received and both BisectBatchOnFunctionError
and
ReportBatchItemFailures
are turned on, the batch is bisected at the returned sequence number and
Lambda retries only the remaining records.
To simplify the implementation of partial batch response logic, consider using the Batch Processor utility
from Powertools for Amazon Lambda, which automatically handles these complexities for you.
Here are some examples of function code that return the list of failed message IDs in the batch:
- .NET
-
- Amazon SDK for .NET
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegration;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return new StreamsEventResponse();
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return new StreamsEventResponse
{
BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
{
new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
}
};
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
return new StreamsEventResponse();
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
public class StreamsEventResponse
{
[JsonPropertyName("batchItemFailures")]
public IList<BatchItemFailure> BatchItemFailures { get; set; }
public class BatchItemFailure
{
[JsonPropertyName("itemIdentifier")]
public string ItemIdentifier { get; set; }
}
}
- Go
-
- SDK for Go V2
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, record := range kinesisEvent.Records {
curRecordSequenceNumber := ""
// Process your record
if /* Your record processing condition here */ {
curRecordSequenceNumber = record.Kinesis.SequenceNumber
}
// Add a condition to check if the record processing failed
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
}
}
kinesisBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return kinesisBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
try {
//Process your record
KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
curRecordSequenceNumber = kinesisRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK for JavaScript (v3)
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Javascript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Reporting Kinesis batch item failures with Lambda using TypeScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDK for PHP
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$kinesisEvent = new KinesisEvent($event);
$this->logger->info("Processing records");
$records = $kinesisEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDK for Ruby
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
batch_item_failures = []
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue StandardError => err
puts "An error occurred #{err}"
# Since we are working with streams, we can return the failed item immediately.
# Lambda will immediately begin to retry processing from this failed item onwards.
return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
end
end
puts "Successfully processed #{event['Records'].length} records."
{ batchItemFailures: batch_item_failures }
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('utf-8')
# Placeholder for actual async work
sleep(1)
data
end
- Rust
-
- SDK for Rust
-
There's more on GitHub. Find the complete example and learn how to set up and run in the
Serverless examples
repository.
Reporting Kinesis batch item failures with Lambda using Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::kinesis::KinesisEvent,
kinesis::KinesisEventRecord,
streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
let mut response = KinesisEventResponse {
batch_item_failures: vec![],
};
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in &event.payload.records {
tracing::info!(
"EventId: {}",
record.event_id.as_deref().unwrap_or_default()
);
let record_processing_result = process_record(record);
if record_processing_result.is_err() {
response.batch_item_failures.push(KinesisBatchItemFailure {
item_identifier: record.kinesis.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(response)
}
fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
let record_data = std::str::from_utf8(record.kinesis.data.as_slice());
if let Some(err) = record_data.err() {
tracing::error!("Error: {}", err);
return Err(Error::from(err));
}
let record_data = record_data.unwrap_or_default();
// do something interesting with the data
tracing::info!("Data: {}", record_data);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
The batch processor utility from Powertools for Amazon Lambda automatically handles partial batch response logic, reducing the complexity of
implementing batch failure reporting. Here are examples using the batch processor:
- Python
-
Processing Kinesis Data Streams stream records with Amazon Lambda batch processor.
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
Processing Kinesis Data Streams stream records with Amazon Lambda batch processor.
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: KinesisEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
- Java
-
Processing Kinesis Data Streams stream records with Amazon Lambda batch processor.
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {
private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;
public KinesisStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withKinesisBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}
@Override
public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
return handler.processBatch(kinesisEvent, context);
}
private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
// Process the stream record
}
}
- .NET
-
Processing Kinesis Data Streams stream records with Amazon Lambda batch processor.
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;
[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]
namespace HelloWorld;
public class OrderEvent
{
public string? OrderId { get; set; }
public string? CustomerId { get; set; }
public decimal Amount { get; set; }
public DateTime OrderDate { get; set; }
}
internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent>
{
public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(orderEvent.OrderId))
{
throw new ArgumentException("Order ID is required");
}
return await Task.FromResult(RecordHandlerResult.None);
}
}
public class Function
{
[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
{
return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
}