步骤 2:编写和检查代码 - Amazon Kinesis Video Streams
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 2:编写和检查代码

在此部分中,您将检查 Java 库和测试代码,并了解如何在您自己的代码中使用该库中的工具。

Kinesis 视频流解析器库包含以下工具:

StreamingMkvReader

此类以非阻止方式从流中读取指定的 MKV 元素。

以下代码示例(来自 FragmentMetadataVisitorTest)说明如何创建 Streaming MkvReader 并使用它从名为 inputStream 的输入流中检索 MkvElement 对象:

StreamingMkvReader mkvStreamReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(inputStream)); while (mkvStreamReader.mightHaveNext()) { Optional<MkvElement> mkvElement = mkvStreamReader.nextIfAvailable(); if (mkvElement.isPresent()) { mkvElement.get().accept(fragmentVisitor); ... } } }

FragmentMetadataVisitor

该类检索片段(媒体元素)的元数据,并跟踪包含媒体信息的单个数据流,例如编解码器私有数据、像素宽度或像素高度。

以下代码示例 (来自 FragmentMetadataVisitorTest 文件) 说明如何使用 FragmentMetadataVisitor 检索 MkvElement 对象中的数据:

FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(); StreamingMkvReader mkvStreamReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(in)); int segmentCount = 0; while(mkvStreamReader.mightHaveNext()) { Optional<MkvElement> mkvElement = mkvStreamReader.nextIfAvailable(); if (mkvElement.isPresent()) { mkvElement.get().accept(fragmentVisitor); if (MkvTypeInfos.SIMPLEBLOCK.equals(mkvElement.get().getElementMetaData().getTypeInfo())) { MkvDataElement dataElement = (MkvDataElement) mkvElement.get(); Frame frame = ((MkvValue<Frame>)dataElement.getValueCopy()).getVal(); MkvTrackMetadata trackMetadata = fragmentVisitor.getMkvTrackMetadata(frame.getTrackNumber()); assertTrackAndFragmentInfo(fragmentVisitor, frame, trackMetadata); } if (MkvTypeInfos.SEGMENT.equals(mkvElement.get().getElementMetaData().getTypeInfo())) { if (mkvElement.get() instanceof MkvEndMasterElement) { if (segmentCount < continuationTokens.size()) { Optional<String> continuationToken = fragmentVisitor.getContinuationToken(); Assert.assertTrue(continuationToken.isPresent()); Assert.assertEquals(continuationTokens.get(segmentCount), continuationToken.get()); } segmentCount++; } } } }

上一个示例显示以下编码模式:

  • 创建一个 FragmentMetadataVisitor 来解析数据,并创建一个 StreamingMkvReader 来提供数据。

  • 对于流中的每个 MkvElement,测试其元数据的类型是否为 SIMPLEBLOCK

  • 如果是,则从 MkvElement 检索 MkvDataElement

  • MkvDataElement 检索 Frame(媒体数据)。

  • FragmentMetadataVisitor 检索 FrameMkvTrackMetadata

  • FrameMkvTrackMetadata 对象检索并验证以下数据:

    • 音轨编号。

    • 帧的像素高度。

    • 帧的像素宽度。

    • 用于对帧进行编码的编解码器的 ID。

    • 此帧的到达顺序。验证前一帧的轨道号(如果存在)是否小于当前帧的轨道号。

要在项目中使用 FragmentMetadataVisitor,请使用访客的 accept 方法将 MkvElement 对象传递给访客:

mkvElement.get().accept(fragmentVisitor);

OutputSegmentMerger

此类将来自流中不同音轨的元数据合并到带单个片段的流中。

以下代码示例 (来自 FragmentMetadataVisitorTest 文件) 说明如何使用 OutputSegmentMerger 合并来自名为 inputBytes 的字节数组的音轨元数据:

FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); OutputSegmentMerger outputSegmentMerger = OutputSegmentMerger.createDefault(outputStream); CompositeMkvElementVisitor compositeVisitor = new TestCompositeVisitor(fragmentVisitor, outputSegmentMerger); final InputStream in = TestResourceUtil.getTestInputStream("output_get_media.mkv"); StreamingMkvReader mkvStreamReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(in)); while (mkvStreamReader.mightHaveNext()) { Optional<MkvElement> mkvElement = mkvStreamReader.nextIfAvailable(); if (mkvElement.isPresent()) { mkvElement.get().accept(compositeVisitor); if (MkvTypeInfos.SIMPLEBLOCK.equals(mkvElement.get().getElementMetaData().getTypeInfo())) { MkvDataElement dataElement = (MkvDataElement) mkvElement.get(); Frame frame = ((MkvValue<Frame>) dataElement.getValueCopy()).getVal(); Assert.assertTrue(frame.getFrameData().limit() > 0); MkvTrackMetadata trackMetadata = fragmentVisitor.getMkvTrackMetadata(frame.getTrackNumber()); assertTrackAndFragmentInfo(fragmentVisitor, frame, trackMetadata); } }

上一个示例显示以下编码模式:

  • 创建 FragmentMetadataVisitor 以从流中检索元数据。

  • 创建一个输出流来接收合并的元数据。

  • 创建 OutputSegmentMerger,传入到 ByteArrayOutputStream 中。

  • 创建包含两个访问者的 CompositeMkvElementVisitor

  • 创建指向指定文件的 InputStream

  • 将输入数据中的每个元素合并到输出流中。

KinesisVideoExample

这是一个示例应用程序,显示了如何使用 Kinesis 视频流解析器库。

该类执行以下操作:

  • 创建 Kinesis 视频流。如果已存在具有给定名称的流,则删除流并重新创建。

  • 调用PutMedia将视频片段流式传输到 Kinesis 视频流。

  • 关于GetMedia从 Kinesis 视频流中流出视频片段的调用。

  • 使用 StreamingMkvReader 解析流中返回的片段,使用 FragmentMetadataVisitor 记录片段。

删除流并重新创建

以下代码示例(来自StreamOps.java文件)删除给定的 Kinesis 视频流:

//Delete the stream amazonKinesisVideo.deleteStream(new DeleteStreamRequest().withStreamARN(streamInfo.get().getStreamARN()));

以下代码示例(来自StreamOps.java文件)创建了具有指定名称的 Kinesis 视频流:

amazonKinesisVideo.createStream(new CreateStreamRequest().withStreamName(streamName) .withDataRetentionInHours(DATA_RETENTION_IN_HOURS) .withMediaType("video/h264"));

打电话 PutMedia

以下代码示例(来自PutMediaWorker.java文件)调用PutMedia流:

putMedia.putMedia(new PutMediaRequest().withStreamName(streamName) .withFragmentTimecodeType(FragmentTimecodeType.RELATIVE) .withProducerStartTimestamp(new Date()) .withPayload(inputStream), new PutMediaAckResponseHandler() { ... });

打电话 GetMedia

以下代码示例(来自GetMediaWorker.java文件)调用GetMedia流:

GetMediaResult result = videoMedia.getMedia(new GetMediaRequest().withStreamName(streamName).withStartSelector(startSelector));

解析结果 GetMedia

本节介绍如何使用 StreamingMkvReaderFragmentMetadataVisitorCompositeMkvElementVisitor 解析、保存到文件以及记录 GetMedia 返回的数据。

读取 with GetMedia 的输出 StreamingMkvReader

以下代码示例(来自GetMediaWorker.java文件)创建StreamingMkvReader并使用它来解析GetMedia操作的结果:

StreamingMkvReader mkvStreamReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(result.getPayload())); log.info("StreamingMkvReader created for stream {} ", streamName); try { mkvStreamReader.apply(this.elementVisitor); } catch (MkvElementVisitException e) { log.error("Exception while accepting visitor {}", e); }

在前面的代码示例中,StreamingMkvReaderGetMedia 结果的负载中检索 MKVElement 对象。在下一节中,将元素传递给 FragmentMetadataVisitor

使用以下方法检索片段 FragmentMetadataVisitor

下面的代码示例 (摘自 KinesisVideoExample.javaStreamingMkvReader.java 文件) 创建 FragmentMetadataVisitor。然后,将 StreamingMkvReader 迭代的 MkvElement 对象传递给使用 accept 方法的访问者。

摘自 KinesisVideoExample.java

FragmentMetadataVisitor fragmentMetadataVisitor = FragmentMetadataVisitor.create();

摘自 StreamingMkvReader.java

if (mkvElementOptional.isPresent()) { //Apply the MkvElement to the visitor mkvElementOptional.get().accept(elementVisitor); }

记录元素并将其写入文件

下面的代码示例 (摘自 KinesisVideoExample.java 文件) 创建以下对象,并将它们作为 GetMediaProcessingArguments 函数返回值的一部分返回:

  • 写入系统日志的 LogVisitor (MkvElementVisitor 的扩展)。

  • 将传入数据写入 MKV 文件的 OutputStream

  • 缓冲发往 OutputStream 的数据的 BufferedOutputStream

  • GetMedia 结果中的连续元素与相同音轨和 EBML 数据合并的 OutputSegmentMerger

  • A CompositeMkvElementVisitorFragmentMetadataVisitorOutputSegmentMerger、和组成LogVisitor为单个元素的访客。

//A visitor used to log as the GetMedia stream is processed. LogVisitor logVisitor = new LogVisitor(fragmentMetadataVisitor); //An OutputSegmentMerger to combine multiple segments that share track and ebml metadata into one //mkv segment. OutputStream fileOutputStream = Files.newOutputStream(Paths.get("kinesis_video_example_merged_output2.mkv"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); BufferedOutputStream outputStream = new BufferedOutputStream(fileOutputStream); OutputSegmentMerger outputSegmentMerger = OutputSegmentMerger.createDefault(outputStream); //A composite visitor to encapsulate the three visitors. CompositeMkvElementVisitor mkvElementVisitor = new CompositeMkvElementVisitor(fragmentMetadataVisitor, outputSegmentMerger, logVisitor); return new GetMediaProcessingArguments(outputStream, logVisitor, mkvElementVisitor);

然后将媒体处理参数传递到GetMediaWorker,然后传递给,后者在单独的ExecutorService线程上执行 worker:

GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(), getCredentialsProvider(), getStreamName(), new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST), amazonKinesisVideo, getMediaProcessingArgumentsLocal.getMkvElementVisitor()); executorService.submit(getMediaWorker);

下一步

步骤 3:运行并验证代码