步骤 4:验证应用程序输出 - 适用于 SQL 应用程序的 Amazon Kinesis Data Analytics 开发人员指南
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用适用于 SQL 应用程序的 Kinesis Data Analytics。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 4:验证应用程序输出

热点示例的此部分中,您将设置一个 Web 应用程序,该应用程序在可扩展矢量图形 (SVG) 控件中显示热点信息。

  1. 使用以下内容创建名为 index.html 的文件:

    <!doctype html> <html lang=en> <head> <meta charset=utf-8> <title>hotspots viewer</title> <style> #visualization { display: block; margin: auto; } .point { opacity: 0.2; } .hot { fill: red; } .cold { fill: blue; } .hotspot { stroke: black; stroke-opacity: 0.8; stroke-width: 1; fill: none; } </style> <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script> <script src="https://d3js.org/d3.v4.min.js"></script> </head> <body> <svg id="visualization" width="600" height="600"></svg> <script src="hotspots_viewer.js"></script> </body> </html>
  2. 在同一目录中创建一个名为 hotspots_viewer.js 的文件,该文件包含以下内容。在提供的变量中包含您的 、凭证和输出流名称。

    // Visualize example output from the Kinesis Analytics hotspot detection algorithm. // This script assumes that the output stream has a single shard. // Modify this section to reflect your Amazon configuration var awsRegion = "", // The where your Kinesis Analytics application is configured. accessKeyId = "", // Your Access Key ID secretAccessKey = "", // Your Secret Access Key outputStream = ""; // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS // function was called with. var windowSize = 1000, // The window size used for hotspot detection minimumDensity = 40, // A filter applied to returned hotspots before visualization xRange = [0, 10], // The range of values to display on the x-axis yRange = [0, 10]; // The range of values to display on the y-axis //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // D3 setup //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// var svg = d3.select("svg"), margin = {"top": 20, "right": 20, "bottom": 20, "left": 20}, graphWidth = +svg.attr("width") - margin.left - margin.right, graphHeight = +svg.attr("height") - margin.top - margin.bottom; // Return the linear function that maps the segment [a, b] to the segment [c, d]. function linearScale(a, b, c, d) { var m = (d - c) / (b - a); return function(x) { return c + m * (x - a); }; } // helper functions to extract the x-value from a stream record and scale it for output var xValue = function(r) { return r.x; }, xScale = linearScale(xRange[0], xRange[1], 0, graphWidth), xMap = function(r) { return xScale(xValue(r)); }; // helper functions to extract the y-value from a stream record and scale it for output var yValue = function(r) { return r.y; }, yScale = linearScale(yRange[0], yRange[1], 0, graphHeight), yMap = function(r) { return yScale(yValue(r)); }; // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; }; var g = svg.append("g") .attr("transform", "translate(" + margin.left + "," + margin.top + ")"); function update(records, hotspots) { var points = g.selectAll("circle") .data(records, function(r) { return r.dataIndex; }); points.enter().append("circle") .attr("class", classMap) .attr("r", 3) .attr("cx", xMap) .attr("cy", yMap); points.exit().remove(); if (hotspots) { var boxes = g.selectAll("rect").data(hotspots); boxes.enter().append("rect") .merge(boxes) .attr("class", "hotspot") .attr("x", function(h) { return xScale(h.minValues[0]); }) .attr("y", function(h) { return yScale(h.minValues[1]); }) .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); }) .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); }); boxes.exit().remove(); } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Use the Amazon SDK to pull output records from Kinesis and update the visualization //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// var kinesis = new AWS.Kinesis({ "region": awsRegion, "accessKeyId": accessKeyId, "secretAccessKey": secretAccessKey }); var textDecoder = new TextDecoder("utf-8"); // Decode an output record into an object and assign it an index value function decodeRecord(record, recordIndex) { var record = JSON.parse(textDecoder.decode(record.Data)); var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT); record.hotspots = hotspots_result.hotspots .filter(function(hotspot) { return hotspot.density >= minimumDensity}); record.index = recordIndex return record; } // Fetch a new records from the shard iterator, append them to records, and update the visualization function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) { kinesis.getRecords({ "ShardIterator": shardIterator }, function(err, data) { if (err) { console.log(err, err.stack); return; } var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); }); newRecords.forEach(function(record) { records.push(record); }); var hotspots = null; if (newRecords.length > 0) { hotspots = newRecords[newRecords.length - 1].hotspots; } while (records.length > windowSize) { records.shift(); } update(records, hotspots); getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex); }); } // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only // read records from the first shard in the stream. function init() { kinesis.describeStream({ "StreamName": outputStream }, function(err, data) { if (err) { console.log(err, err.stack); return; } var shardId = data.StreamDescription.Shards[0].ShardId; kinesis.getShardIterator({ "StreamName": outputStream, "ShardId": shardId, "ShardIteratorType": "LATEST" }, function(err, data) { if (err) { console.log(err, err.stack); return; } getRecordsAndUpdateVisualization(data.ShardIterator, [], 0); }) }); } // Start the visualization init();
  3. 在第一个部分中 Python 代码运行时,在 Web 浏览器中打开 index.html。热点信息显示在页面上,如下所示。

    
                            显示热点信息的可扩展矢量图形图表。