示例和使用案例
Lambda 持久性函数使您能够使用诸如步骤和等待之类的持久操作来构建容错的多步骤应用程序。借助自动检查点机制以及检查点重放模型(在此模型中,如果出现故障,程序会从头开始重新执行,但会跳过已完成的检查点),这样您的函数就能从故障中恢复并恢复执行,而不会丢失之前的进度。
短暂的容错进程
使用持久性函数来构建通常在几分钟内完成的可靠操作。虽然这些流程的执行时间比长时间运行的工作流程短,但它们仍能从分布式系统中的自动检查点机制和容错能力中获益。持久性函数能够确保您的多步骤进程即便在个别服务调用失败的情况下也能顺利完成,而无需复杂的错误处理或状态管理代码。
常见场景包括酒店预订系统、餐厅预订平台、拼车出行申请、活动门票购买和 SaaS 订阅升级。这些场景具有共同特征:多个服务调用必须一起完成,需要在瞬态故障时自动重试,以及需要在分布式系统之间保持一致的状态。
跨微服务的分布式事务
在多个服务之间协调支付、库存和发货事宜,并在出现故障时自动进行回滚操作。每个服务操作都被封装在一个步骤中,这样即使服务出现故障,整个事务也能从任何环节进行恢复。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
如果任何步骤失败,该函数将自动从上次成功的检查点开始重试。即使支付处理暂时出现故障,库存预留仍会继续有效。当该函数重试时,它会跳过已完成的库存步骤,直接进入付款处理。这能够消除重复的预留,并确保整个分布式系统中的状态保持一致。
多步骤订单处理
通过验证、付款授权、库存分配和履行等环节来处理订单,并具备自动重试和恢复功能。每一个步骤都会进行检查点操作,这样即便个别步骤失败并重试,订单也能继续推进。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }
这种模式可确保订单永远不会停留在中间状态。如果验证失败,订单将在付款授权之前被拒绝。如果付款授权失败,则不会分配库存。每个步骤建立在前一个步骤的基础之上,且具有自动重试和恢复功能。
注意
条件检查 if (!validation.itemsValid) 在步骤之外,将在重放期间重新执行。这是安全的,因为它是确定性的,只要输入相同的验证对象,它就会始终产生相同的结果。
长期运行的进程
对于持续数小时、数天或数周的流程,使用持久性函数。等待操作会暂停执行但不会产生计算费用,这使得长时间运行的进程具有成本效益。在等待期间,您的函数会停止运行,Lambda 会回收执行环境。当需要恢复时,Lambda 会再次调用您的函数,并从上一次的检查点处重放。
这种执行模型使持久性函数非常适合需要长时间暂停的进程,无论是等待人工决策、外部系统响应、计划的处理窗口还是基于时间的延迟。您只需为实际使用的计算时间付费,而无需为等待时间付费。
常见场景包括文件批准流程、计划的批处理、多日入驻流程、订阅试用流程以及延迟的通知系统。这些场景具有共同的特征:以小时或天为单位计算的延长等待时间,需要在这些等待期间保持执行状态,以及成本敏感型要求,即无法为空闲计算时间付费。
人工干预批准
在保持执行状态的同时,暂停执行文档审核、批准或决策。该函数会等待外部回调请求,且不会消耗资源,一旦收到批准便会自动恢复运行。
这种模式对于需要人工判断或外部验证的流程至关重要。该函数在回调点处暂停执行,在等待期间不会产生计算费用。当有人通过 API 提交其决策时,Lambda 会再次调用您的函数,并从上次保存的检查点重放,以继续处理批准结果。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }
当收到回调且您的函数恢复时,它会从头开始重放。准备文档的步骤会立即返回其检查点结果。waitForCallback 操作还会立即返回存储的批准结果,而不会再次等待。然后,执行过程会继续进行到最后的完善或归档步骤。
多阶段数据管线
通过提取、转换和加载等阶段对大型数据集进行处理,并在各阶段之间设置检查点。每个阶段可能需要数小时才能完成,而检查点则能确保管线能从出现中断的任何阶段恢复。
这种模式非常适合 ETL 工作流程、数据迁移或批处理作业,在这些作业中,您需要分阶段处理数据,并在各阶段之间设置恢复点。如果某个阶段失败,管线将从最后一个已完成的阶段恢复,而不是从头开始重新启动。您还可以使用等待操作在各阶段之间进行暂停;例如,在遵守速率限制的前提下等待下游系统准备好,或者在非高峰时段调度处理工作。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }
每个阶段都被封装在一个步骤中,从而形成一个检查点,这样,如果流程被打断,管线仍可以从任何阶段恢复。提取和转换之间的 5 分钟等待时间既遵守了源系统的速率限制,又不会消耗计算资源,而等待到凌晨 2 点则将昂贵的加载操作安排在了非高峰时段。
注意
new Date() 调用和 calculateMsUntilHour() 函数在步骤之外,将在重放期间重新执行。对于必须在重放间保持一致的时间相关操作,请在步骤内部计算时间戳,或者仅将其用于等待持续时间(等待持续时间会被检查点处理)。
跨函数的链式调用
使用 context.invoke() 从持久性函数内调用其他 Lambda 函数。调用函数在等待被调用函数完成期间会暂停,从而形成一个检查点,以保存执行结果。如果调用函数在被调用函数执行完毕后中断,它会以存储的结果继续执行,而无需重新调用函数。
当您拥有处理特定领域(客户验证、支付处理、库存管理)的专用函数,并且需要在工作流中协调它们时,请使用此模式。每个函数都拥有独立的逻辑,并且可以被多个编排工具函数调用,从而避免了代码的重复。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } ); - Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }
每次调用都会在编排工具函数中创建一个检查点。如果编排工具在客户验证完成后中断,它将从该检查点恢复,使用存储的客户数据,并跳过验证调用过程。这能够避免对下游服务的重复调用,并确保在中断情况下也能保持执行的一致性。
被调用函数可以是持久性 Lambda 函数,也可以是标准的 Lambda 函数。如果您调用一个持久性函数,它可以有自己的多步骤工作流程,其中包括等待和检查点。编排工具只需等待整个持久执行过程完成,然后接收最终结果即可。
注意
不支持跨账户调用。所有被调用函数必须与调用函数在同一 Amazon 账户中。
高级模式
使用持久性函数构建复杂的多步骤应用程序,这些应用程序能够整合多个持久操作、并行执行、数组处理、条件逻辑和轮询。这些模式使您能够构建复杂的应用程序,而这些应用程序能够协调多项任务,并同时具备容错能力和自动恢复功能。
高级模式并非只是简单的按顺序进行的步骤。您可以与 parallel() 并行运行操作,使用 map() 处理数组,通过 waitForCondition() 等待外部条件,然后将这些基元组合起来以构建可靠的应用程序。每个持久操作都会创建自己的检查点,因此如果应用程序出现中断情况,仍能从任何一点恢复操作。
用户入驻流程
引导用户完成注册、电子邮件验证、个人资料设置以及初始配置流程,并提供重试处理功能。此示例结合了顺序步骤、回调和条件逻辑,从而构建了一个完整的入驻流程。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }
该流程将顺序步骤与创建账户和发送电子邮件的检查点相结合,然后暂停运行最长 48 小时,以等待电子邮件验证,且在此期间不会消耗资源。条件逻辑根据验证是否完成或超时的情况来处理不同的路径。个人资料设置任务通过并行操作同时运行,以缩短总执行时间,并且每个步骤都会在出现暂时性故障时自动重试,以确保入驻过程能够可靠完成。
使用检查点进行批处理
在出现故障后,通过自动恢复上一次成功的检查点的方式处理数百万条记录。此示例演示了持久性函数如何将 map() 操作与分块和速率限制相结合,以进行大规模数据处理。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } ); - Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary
将记录按可管理的批次进行拆分,以避免占用过多内存或影响下游服务,随后多个批次并行处理,由 maxConcurrency 控制并行度。每个批次都有其独立的检查点,因此出现故障时只会重新处理失败的批次,而不会重新处理所有记录。这种模式非常适合用于 ETL 作业、数据迁移或批量操作,因为在这些操作中,处理过程可能需要数小时。