Amazon Transcribe
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

流式重试客户端

您可以在应用程序中使用以下代码来处理 Amazon Transcribe 流式转录的重试逻辑。该代码提供了对与 Amazon Transcribe 的连接中的间歇性故障的容忍度。客户端有两个部分:您为应用程序实施的接口和重试客户端本身。

流式重试客户端代码

该代码实施流式重试客户端。它管理与 Amazon Transcribe 的连接,并在连接出错时重试发送数据。例如,如果网络出现临时错误,则此客户端将重新发送失败的请求。

重试客户端具有用来控制客户端行为的两个属性。您可以设置:

  • 客户端在失败之前应尝试的最大次数。减小此值可使应用程序在出现网络问题时尽快停止重试。默认值为 10。

  • 客户端在重试之间应等待的时间(以毫秒为单位)。更长的时间会增加丢失数据的风险,更短的时间会增加应用程序受到限制的风险。默认为 100 毫秒。

下面是客户端。您可以将此代码复制到您的应用程序中,或将其用作您自己的客户端的起点。

/** * COPYRIGHT: * <p> * Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.transcribestreaming.retryclient; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.signer.EventStreamAws4Signer; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.transcribestreaming.TranscribeStreamingAsyncClient; import software.amazon.awssdk.services.transcribestreaming.model.AudioStream; import software.amazon.awssdk.services.transcribestreaming.model.BadRequestException; import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest; import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; /** * Build a client wrapper around the Amazon Transcribe client to retry * on an exception that can be retried. */ public class TranscribeStreamingRetryClient { private static final int DEFAULT_MAX_RETRIES = 10; private static final int DEFAULT_MAX_SLEEP_TIME_MILLS = 100; private static final Logger log = LoggerFactory.getLogger(TranscribeStreamingRetryClient.class); private final TranscribeStreamingAsyncClient client; List<Class<?>> nonRetriableExceptions = Arrays.asList(BadRequestException.class); private int maxRetries = DEFAULT_MAX_RETRIES; private int sleepTime = DEFAULT_MAX_SLEEP_TIME_MILLS; /** * Create a TranscribeStreamingRetryClient with given credential and configuration * * @param creds Creds to use for transcription * @param endpoint Endpoint to use for transcription * @param region Region to use for transcriptions * @throws URISyntaxException if the endpoint is not a URI */ public TranscribeStreamingRetryClient(AwsCredentialsProvider creds, String endpoint, Region region) throws URISyntaxException { this(TranscribeStreamingAsyncClient.builder() .overrideConfiguration( c -> c.putAdvancedOption( SdkAdvancedClientOption.SIGNER, EventStreamAws4Signer.create())) .credentialsProvider(creds) .endpointOverride(new URI(endpoint)) .region(region) .build()); } /** * Initiate TranscribeStreamingRetryClient with TranscribeStreamingAsyncClient * * @param client TranscribeStreamingAsyncClient */ public TranscribeStreamingRetryClient(TranscribeStreamingAsyncClient client) { this.client = client; } /** * Get Max retries * * @return Max retries */ public int getMaxRetries() { return maxRetries; } /** * Set Max retries * * @param maxRetries Max retries */ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } /** * Get sleep time * * @return sleep time between retries */ public int getSleepTime() { return sleepTime; } /** * Set sleep time between retries * * @param sleepTime sleep time */ public void setSleepTime(int sleepTime) { this.sleepTime = sleepTime; } /** * Initiate a Stream Transcription with retry. * * @param request StartStreamTranscriptionRequest to use to start transcription * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @return Completable future to handle stream response. */ public CompletableFuture<Void> startStreamTranscription(final StartStreamTranscriptionRequest request, final Publisher<AudioStream> publisher, final StreamTranscriptionBehavior responseHandler) { CompletableFuture<Void> finalFuture = new CompletableFuture<>(); recursiveStartStream(rebuildRequestWithSession(request), publisher, responseHandler, finalFuture, 0); return finalFuture; } /** * Recursively call startStreamTranscription() to be called till the request is completed or till we run out of retries. * * @param request StartStreamTranscriptionRequest * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @param finalFuture final future to finish on completing the chained futures. * @param retryAttempt Current attempt number */ private void recursiveStartStream(final StartStreamTranscriptionRequest request, final Publisher<AudioStream> publisher, final StreamTranscriptionBehavior responseHandler, final CompletableFuture<Void> finalFuture, final int retryAttempt) { CompletableFuture<Void> result = client.startStreamTranscription(request, publisher, getResponseHandler(responseHandler)); result.whenComplete((r, e) -> { if (e != null) { log.debug("Error occured:", e); if (retryAttempt <= maxRetries && isExceptionRetriable(e)) { log.debug("Retriable error occurred and will be retried."); log.debug("Sleeping for sometime before retrying..."); try { Thread.sleep(sleepTime); } catch (InterruptedException e1) { log.debug("Unable to sleep. Failed with exception: ", e); e1.printStackTrace(); } log.debug("Making retry attempt: " + (retryAttempt + 1)); recursiveStartStream(request, publisher, responseHandler, finalFuture, retryAttempt + 1); } else { log.error("Encountered unretriable exception or ran out of retries. "); responseHandler.onError(e); finalFuture.completeExceptionally(e); } } else { responseHandler.onComplete(); finalFuture.complete(null); } }); } private StartStreamTranscriptionRequest rebuildRequestWithSession(StartStreamTranscriptionRequest request) { return StartStreamTranscriptionRequest.builder() .languageCode(request.languageCode()) .mediaEncoding(request.mediaEncoding()) .mediaSampleRateHertz(request.mediaSampleRateHertz()) .sessionId(UUID.randomUUID().toString()) .build(); } /** * StartStreamTranscriptionResponseHandler implements subscriber of transcript stream * Output is printed to standard output */ private StartStreamTranscriptionResponseHandler getResponseHandler( StreamTranscriptionBehavior transcriptionBehavior) { final StartStreamTranscriptionResponseHandler build = StartStreamTranscriptionResponseHandler.builder() .onResponse(r -> { transcriptionBehavior.onResponse(r); }) .onError(e -> { //Do nothing here. Don't close any streams that shouldn't be cleaned up yet. }) .onComplete(() -> { //Do nothing here. Don't close any streams that shouldn't be cleaned up yet. }) .subscriber(event -> transcriptionBehavior.onStream(event)) .build(); return build; } /** * Check if the exception can be retried. * * @param e Exception that occurred * @return True if the exception is retriable */ private boolean isExceptionRetriable(Throwable e) { e.printStackTrace(); return nonRetriableExceptions.contains(e.getClass()); } public void close() { this.client.close(); } }

流式重试客户端接口代码

此接口类似于入门示例中使用的响应处理程序。它实施相同的事件处理程序。实施此接口以使用流式重试客户端。

/** * COPYRIGHT: * <p> * Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.transcribestreaming.retryclient; import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponse; import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; /** * Defines how a stream response should be handled. * You should build a class implementing this interface to define the behavior. */ public interface StreamTranscriptionBehavior { /** * Defines how to respond when encountering an error on the stream transcription. * * @param e The exception */ void onError(Throwable e); /** * Defines how to respond to the Transcript result stream. * * @param e The TranscriptResultStream event */ void onStream(TranscriptResultStream e); /** * Defines what to do on initiating a stream connection with the service. * * @param r StartStreamTranscriptionResponse */ void onResponse(StartStreamTranscriptionResponse r); /** * Defines what to do on stream completion */ void onComplete(); }

以下是 StreamTranscriptionBehavior 接口的示例实施。您可以使用此实施,或将其用作您自己的实施的起点。

/** * COPYRIGHT: * <p> * Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.transcribestreaming.retryclient; import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponse; import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream; /** * Defines how a stream response should be handled. * You should build a class implementing this interface to define the behavior. */ public interface StreamTranscriptionBehavior { /** * Defines how to respond when encountering an error on the stream transcription. * * @param e The exception */ void onError(Throwable e); /** * Defines how to respond to the Transcript result stream. * * @param e The TranscriptResultStream event */ void onStream(TranscriptResultStream e); /** * Defines what to do on initiating a stream connection with the service. * * @param r StartStreamTranscriptionResponse */ void onResponse(StartStreamTranscriptionResponse r); /** * Defines what to do on stream completion */ void onComplete(); }

下一步

使用重试客户端