写入数据(插入和向上移动) - Amazon Timestream
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

从2025年6月20日起,亚马逊Timestream版 LiveAnalytics 将不再向新客户开放。如果您想使用亚马逊 Timestream LiveAnalytics,请在该日期之前注册。现有客户可以继续照常使用该服务。有关更多信息,请参阅 Amazon Timestream 以了解 LiveAnalytics 可用性变更。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

写入数据(插入和向上移动)

写入成批的记录

您可以使用以下代码片段将数据写入 Amazon Timestream 表。批量写入数据有助于优化写入成本。请参阅计算写入次数了解更多信息。

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = new Record() .withDimensions(dimensions) .withMeasureName("cpu_utilization") .withMeasureValue("13.5") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record memoryUtilization = new Record() .withDimensions(dimensions) .withMeasureName("memory_utilization") .withMeasureValue("40") .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
Java v2
public void writeRecords() { System.out.println("Writing records"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record cpuUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("cpu_utilization") .measureValue("13.5") .time(String.valueOf(time)).build(); Record memoryUtilization = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .measureName("memory_utilization") .measureValue("40") .time(String.valueOf(time)).build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME).tableName(TABLE_NAME).records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
Go
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := &timestreamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ &timestreamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ &timestreamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, &timestreamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, &timestreamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, &timestreamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ &timestreamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, &timestreamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, &timestreamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
Python
def write_records(self): print("Writing records") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] cpu_utilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': current_time } memory_utilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': current_time } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes={}) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
Node.js

以下代码段使用适用于 JavaScript V2 的 Amazon SDK 风格。它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const cpuUtilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const memoryUtilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': '40', 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
.NET
public async Task WriteRecords() { Console.WriteLine("Writing records"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var cpuUtilization = new Record { Dimensions = dimensions, MeasureName = "cpu_utilization", MeasureValue = "13.6", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var memoryUtilization = new Record { Dimensions = dimensions, MeasureName = "memory_utilization", MeasureValue = "40", MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; List<Record> records = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }

批量写入具有共同属性的记录

如果您的时间序列数据具有在许多数据点中通用的度量和/或维度,则还可以使用以下经过优化的 WriteRecords API 版本将数据插入到 Timestream 中。 LiveAnalytics在批处理中使用常用属性可以进一步优化写入成本,如中所述。计算写入次数

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
Java v2
public void writeRecordsWithCommonAttributes() { System.out.println("Writing records with extracting common attributes"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)).build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); } }
Go
now = time.Now() currentTimeInSeconds = now.Unix() writeRecordsCommonAttributesInput := &timestreamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: &timestreamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ &timestreamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, &timestreamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, &timestreamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), }, Records: []*timestreamwrite.Record{ &timestreamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, &timestreamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Ingest records is successful") }
Python
def write_records_with_common_attributes(self): print("Writing records extracting common attributes") current_time = self._current_milli_time() dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _print_rejected_records_exceptions(err): print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) if "ExistingVersion" in rr: print("Rejected record existing version: ", rr["ExistingVersion"]) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
Node.js

以下代码段使用适用于 JavaScript V2 的 Amazon SDK 风格。它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

async function writeRecordsWithCommonAttributes() { console.log("Writing records with common attributes"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString() }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } ); }
.NET
public async Task WriteRecordsWithCommonAttributes() { Console.WriteLine("Writing records with common attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }

颠覆记录

虽然 Amazon Timestream 中的默认写入遵循第一个写入者获胜的语义,即数据仅作为附加存储并拒绝重复记录,但有些应用程序要求能够使用最后一个写入者获胜语义将数据写入 Amazon Timestream,即版本最高的记录存储在系统中。还有一些应用程序需要能够更新现有记录。为了解决这些情况,Amazon Timestream 提供了更新数据的功能。Upsert 是一种在记录不存在时将记录插入系统的操作,或者在记录存在时更新记录的操作。

您可以通过在发送WriteRecords请求时包含记录Version中的定义来更新记录。亚马逊 Timestream 将存储记录最高的记录。Version下面的代码示例显示了如何更新数据:

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue("us-east-1"); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withMeasureValueType(MeasureValueType.DOUBLE) .withTime(String.valueOf(time)) .withVersion(version); Record cpuUtilization = new Record() .withMeasureName("cpu_utilization") .withMeasureValue("13.5"); Record memoryUtilization = new Record() .withMeasureName("memory_utilization") .withMeasureValue("40"); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsRequest.setRecords(records); // write records for first time try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes.setVersion(version); cpuUtilization.setMeasureValue("14.5"); memoryUtilization.setMeasureValue("50"); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes.setVersion(version); writeRecordsUpsertRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes); writeRecordsUpsertRequest.setRecords(upsertedRecords); try { WriteRecordsResult writeRecordsUpsertResult = amazonTimestreamWrite.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
Java v2
public void writeRecordsWithUpsert() { System.out.println("Writing records with upsert"); // Specify repeated values for all records List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); Record cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("13.5").build(); Record memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("40").build(); records.add(cpuUtilization); records.add(memoryUtilization); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for first time: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status for retry: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1; commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); cpuUtilization = Record.builder() .measureName("cpu_utilization") .measureValue("14.5").build(); memoryUtilization = Record.builder() .measureName("memory_utilization") .measureValue("50").build(); List<Record> upsertedRecords = new ArrayList<>(); upsertedRecords.add(cpuUtilization); upsertedRecords.add(memoryUtilization); WriteRecordsRequest writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with lower version: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("WriteRecords Status for upsert with lower version: "); printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } // upsert with higher version as new data in generated version = System.currentTimeMillis(); commonAttributes = Record.builder() .dimensions(dimensions) .measureValueType(MeasureValueType.DOUBLE) .time(String.valueOf(time)) .version(version) .build(); writeRecordsUpsertRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(upsertedRecords).build(); try { WriteRecordsResponse writeRecordsUpsertResponse = timestreamWriteClient.writeRecords(writeRecordsUpsertRequest); System.out.println("WriteRecords Status for upsert with higher version: " + writeRecordsUpsertResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } }
Go
// Below code will ingest and upsert cpu_utilization and memory_utilization metric for a host on // region=us-east-1, az=az1, and hostname=host1 fmt.Println("Ingesting records and set version as currentTimeInMills, hit enter to continue") reader.ReadString('\n') // Get current time in seconds. now = time.Now() currentTimeInSeconds = now.Unix() // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version := time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput := &timestreamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), CommonAttributes: &timestreamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ &timestreamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, &timestreamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, &timestreamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureValueType: aws.String("DOUBLE"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), Version: &version, }, Records: []*timestreamwrite.Record{ &timestreamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("13.5"), }, &timestreamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("40"), }, }, } // write records for first time _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Frist-time write records is successful") } fmt.Println("Retry same writeRecordsRequest with same records and versions. Because writeRecords API is idempotent, this will success. hit enter to continue") reader.ReadString('\n') _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Retry write records for same request is successful") } fmt.Println("Upsert with lower version, this would fail because a higher version is required to update the measure value. hit enter to continue") reader.ReadString('\n') version -= 1 writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version updated_cpu_utilization := &timestreamwrite.Record{ MeasureName: aws.String("cpu_utilization"), MeasureValue: aws.String("14.5"), } updated_memory_utilization := &timestreamwrite.Record{ MeasureName: aws.String("memory_utilization"), MeasureValue: aws.String("50"), } writeRecordsCommonAttributesUpsertInput.Records = []*timestreamwrite.Record{ updated_cpu_utilization, updated_memory_utilization, } _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with lower version is successful") } fmt.Println("Upsert with higher version as new data in generated, this would success. hit enter to continue") reader.ReadString('\n') version = time.Now().Round(time.Millisecond).UnixNano() / 1e6 // set version as currentTimeInMills writeRecordsCommonAttributesUpsertInput.CommonAttributes.Version = &version _, err = writeSvc.WriteRecords(writeRecordsCommonAttributesUpsertInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records with higher version is successful") }
Python
def write_records_with_upsert(self): print("Writing records with upsert") current_time = self._current_milli_time() # To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source version = int(self._current_milli_time()) dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] common_attributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': current_time, 'Version': version } cpu_utilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' } memory_utilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' } records = [cpu_utilization, memory_utilization] # write records for first time try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for first time: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status for retry: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with lower version, this would fail because a higher version is required to update the measure value. version -= 1 common_attributes["Version"] = version cpu_utilization["MeasureValue"] = '14.5' memory_utilization["MeasureValue"] = '50' upsertedRecords = [cpu_utilization, memory_utilization] try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Status for upsert with lower version: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) # upsert with higher version as new data is generated version = int(self._current_milli_time()) common_attributes["Version"] = version try: upsertedResult = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=upsertedRecords, CommonAttributes=common_attributes) print("WriteRecords Upsert Status: [%s]" % upsertedResult['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: self._print_rejected_records_exceptions(err) except Exception as err: print("Error:", err) @staticmethod def _current_milli_time(): return str(int(round(time.time() * 1000)))
Node.js

以下代码段使用适用于 JavaScript V2 的 Amazon SDK 风格。它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

async function writeRecordsWithUpsert() { console.log("Writing records with upsert"); const currentTime = Date.now().toString(); // Unix time in milliseconds // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source let version = Date.now(); const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const commonAttributes = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const cpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '13.5' }; const memoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '40' }; const records = [cpuUtilization, memoryUtilization]; const params = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: records, CommonAttributes: commonAttributes }; const request = writeClient.writeRecords(params); // write records for first time await request.promise().then( (data) => { console.log("Write records successful for first time."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. await request.promise().then( (data) => { console.log("Write records successful for retry."); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(request); } } ); // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; const commonAttributesWithLowerVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const updatedCpuUtilization = { 'MeasureName': 'cpu_utilization', 'MeasureValue': '14.5' }; const updatedMemoryUtilization = { 'MeasureName': 'memory_utilization', 'MeasureValue': '50' }; const upsertedRecords = [updatedCpuUtilization, updatedMemoryUtilization]; const upsertedParamsWithLowerVersion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithLowerVersion }; const upsertRequestWithLowerVersion = writeClient.writeRecords(upsertedParamsWithLowerVersion); await upsertRequestWithLowerVersion.promise().then( (data) => { console.log("Write records for upsert with lower version successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertRequestWithLowerVersion); } } ); // upsert with higher version as new data in generated version = Date.now(); const commonAttributesWithHigherVersion = { 'Dimensions': dimensions, 'MeasureValueType': 'DOUBLE', 'Time': currentTime.toString(), 'Version': version }; const upsertedParamsWithHigherVerion = { DatabaseName: constants.DATABASE_NAME, TableName: constants.TABLE_NAME, Records: upsertedRecords, CommonAttributes: commonAttributesWithHigherVersion }; const upsertRequestWithHigherVersion = writeClient.writeRecords(upsertedParamsWithHigherVerion); await upsertRequestWithHigherVersion.promise().then( (data) => { console.log("Write records upsert successful with higher version"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { printRejectedRecordsException(upsertedParamsWithHigherVerion); } } ); }
.NET
public async Task WriteRecordsWithUpsert() { Console.WriteLine("Writing records with upsert"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); // To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source long version = now.ToUnixTimeMilliseconds(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, MeasureValueType = MeasureValueType.DOUBLE, Time = currentTimeString, Version = version }; var cpuUtilization = new Record { MeasureName = "cpu_utilization", MeasureValue = "13.6" }; var memoryUtilization = new Record { MeasureName = "memory_utilization", MeasureValue = "40" }; List<Record> records = new List<Record>(); records.Add(cpuUtilization); records.Add(memoryUtilization); // write records for first time try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for first time: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent. try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"WriteRecords Status for retry: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with lower version, this would fail because a higher version is required to update the measure value. version--; Type recordType = typeof(Record); recordType.GetProperty("Version").SetValue(commonAttributes, version); recordType.GetProperty("MeasureValue").SetValue(cpuUtilization, "14.6"); recordType.GetProperty("MeasureValue").SetValue(memoryUtilization, "50"); List<Record> upsertedRecords = new List<Record> { cpuUtilization, memoryUtilization }; try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with lower version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } // upsert with higher version as new data in generated now = DateTimeOffset.UtcNow; version = now.ToUnixTimeMilliseconds(); recordType.GetProperty("Version").SetValue(commonAttributes, version); try { var writeRecordsUpsertRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = upsertedRecords, CommonAttributes = commonAttributes }; WriteRecordsResponse upsertResponse = await writeClient.WriteRecordsAsync(writeRecordsUpsertRequest); Console.WriteLine($"WriteRecords Status for upsert with higher version: {upsertResponse.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { PrintRejectedRecordsException(e); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } }

多度量属性示例

此示例说明如何编写多度量属性。当您正在跟踪的设备或应用程序在同一时间戳发出多个指标或事件时,多指标@@ 属性非常有用。

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
package com.amazonaws.services.timestream; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.REGION; import static com.amazonaws.services.timestream.Main.TABLE_NAME; import java.util.ArrayList; import java.util.List; import com.amazonaws.services.timestreamwrite.AmazonTimestreamWrite; import com.amazonaws.services.timestreamwrite.model.Dimension; import com.amazonaws.services.timestreamwrite.model.MeasureValue; import com.amazonaws.services.timestreamwrite.model.MeasureValueType; import com.amazonaws.services.timestreamwrite.model.Record; import com.amazonaws.services.timestreamwrite.model.RejectedRecordsException; import com.amazonaws.services.timestreamwrite.model.WriteRecordsRequest; import com.amazonaws.services.timestreamwrite.model.WriteRecordsResult; public class multimeasureAttributeExample { AmazonTimestreamWrite timestreamWriteClient; public multimeasureAttributeExample(AmazonTimestreamWrite client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13.5"); MeasureValue memoryUtilization = new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); Record computationalResources = new Record() .withMeasureName("cpu_memory") .withMeasureValues(cpuUtilization, memoryUtilization) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = new Dimension().withName("region").withValue(REGION); final Dimension az = new Dimension().withName("az").withValue("az1"); final Dimension hostname = new Dimension().withName("hostname").withValue("host1"); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = new Record() .withDimensions(dimensions) .withTime(String.valueOf(time)) .withVersion(version); MeasureValue cpuUtilization = new MeasureValue() .withName("cpu_utilization") .withType(MeasureValueType.DOUBLE) .withValue("13"); MeasureValue memoryUtilization =new MeasureValue() .withName("memory_utilization") .withType(MeasureValueType.DOUBLE) .withValue("40"); MeasureValue activeCores = new MeasureValue() .withName("active_cores") .withType(MeasureValueType.BIGINT) .withValue("4"); Record computationalResources = new Record() .withMeasureName("computational_utilization") .withMeasureValues(cpuUtilization, memoryUtilization, activeCores) .withMeasureValueType(MeasureValueType.MULTI); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = new WriteRecordsRequest() .withDatabaseName(DATABASE_NAME) .withTableName(TABLE_NAME) .withCommonAttributes(commonAttributes) .withRecords(records); // write records for first time try { WriteRecordsResult writeRecordResult = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordResult .getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.getRejectedRecords().forEach(System.out::println); } }
Java v2
package com.amazonaws.services.timestream; import java.util.ArrayList; import java.util.List; import software.amazon.awssdk.services.timestreamwrite.TimestreamWriteClient; import software.amazon.awssdk.services.timestreamwrite.model.Dimension; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValue; import software.amazon.awssdk.services.timestreamwrite.model.MeasureValueType; import software.amazon.awssdk.services.timestreamwrite.model.Record; import software.amazon.awssdk.services.timestreamwrite.model.RejectedRecordsException; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsRequest; import software.amazon.awssdk.services.timestreamwrite.model.WriteRecordsResponse; import static com.amazonaws.services.timestream.Main.DATABASE_NAME; import static com.amazonaws.services.timestream.Main.TABLE_NAME; public class multimeasureAttributeExample { TimestreamWriteClient timestreamWriteClient; public multimeasureAttributeExample(TimestreamWriteClient client) { this.timestreamWriteClient = client; } public void writeRecordsMultiMeasureValueSingleRecord() { System.out.println("Writing records with multi value attributes"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); Record computationalResources = Record .builder() .measureName("cpu_memory") .measureValues(cpuUtilization, memoryUtilization) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } public void writeRecordsMultiMeasureValueMultipleRecords() { System.out.println( "Writing records with multi value attributes mixture type"); List<Record> records = new ArrayList<>(); final long time = System.currentTimeMillis(); long version = System.currentTimeMillis(); List<Dimension> dimensions = new ArrayList<>(); final Dimension region = Dimension.builder().name("region").value("us-east-1").build(); final Dimension az = Dimension.builder().name("az").value("az1").build(); final Dimension hostname = Dimension.builder().name("hostname").value("host1").build(); dimensions.add(region); dimensions.add(az); dimensions.add(hostname); Record commonAttributes = Record.builder() .dimensions(dimensions) .time(String.valueOf(time)) .version(version) .build(); MeasureValue cpuUtilization = MeasureValue.builder() .name("cpu_utilization") .type(MeasureValueType.DOUBLE) .value("13.5").build(); MeasureValue memoryUtilization = MeasureValue.builder() .name("memory_utilization") .type(MeasureValueType.DOUBLE) .value("40").build(); MeasureValue activeCores = MeasureValue.builder() .name("active_cores") .type(MeasureValueType.BIGINT) .value("4").build(); Record computationalResources = Record .builder() .measureName("computational_utilization") .measureValues(cpuUtilization, memoryUtilization, activeCores) .measureValueType(MeasureValueType.MULTI) .build(); records.add(computationalResources); WriteRecordsRequest writeRecordsRequest = WriteRecordsRequest.builder() .databaseName(DATABASE_NAME) .tableName(TABLE_NAME) .commonAttributes(commonAttributes) .records(records).build(); // write records for first time try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println( "WriteRecords Status for multi value attributes: " + writeRecordsResponse .sdkHttpResponse() .statusCode()); } catch (RejectedRecordsException e) { printRejectedRecordsException(e); } catch (Exception e) { System.out.println("Error: " + e); } } private void printRejectedRecordsException(RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); e.rejectedRecords().forEach(System.out::println); } }
Go
now := time.Now() currentTimeInSeconds := now.Unix() writeRecordsInput := &timestreamwrite.WriteRecordsInput{ DatabaseName: aws.String(*databaseName), TableName: aws.String(*tableName), Records: []*timestreamwrite.Record{ &timestreamwrite.Record{ Dimensions: []*timestreamwrite.Dimension{ &timestreamwrite.Dimension{ Name: aws.String("region"), Value: aws.String("us-east-1"), }, &timestreamwrite.Dimension{ Name: aws.String("az"), Value: aws.String("az1"), }, &timestreamwrite.Dimension{ Name: aws.String("hostname"), Value: aws.String("host1"), }, }, MeasureName: aws.String("metrics"), MeasureValueType: aws.String("MULTI"), Time: aws.String(strconv.FormatInt(currentTimeInSeconds, 10)), TimeUnit: aws.String("SECONDS"), MeasureValues: []*timestreamwrite.MeasureValue{ &timestreamwrite.MeasureValue{ Name: aws.String("cpu_utilization"), Value: aws.String("13.5"), Type: aws.String("DOUBLE"), }, &timestreamwrite.MeasureValue{ Name: aws.String("memory_utilization"), Value: aws.String("40"), Type: aws.String("DOUBLE"), }, }, }, }, } _, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
Python
import time import boto3 import psutil import os from botocore.config import Config DATABASE_NAME = os.environ['DATABASE_NAME'] TABLE_NAME = os.environ['TABLE_NAME'] COUNTRY = "UK" CITY = "London" HOSTNAME = "MyHostname" # You can make it dynamic using socket.gethostname() INTERVAL = 1 # Seconds def prepare_common_attributes(): common_attributes = { 'Dimensions': [ {'Name': 'country', 'Value': COUNTRY}, {'Name': 'city', 'Value': CITY}, {'Name': 'hostname', 'Value': HOSTNAME} ], 'MeasureName': 'utilization', 'MeasureValueType': 'MULTI' } return common_attributes def prepare_record(current_time): record = { 'Time': str(current_time), 'MeasureValues': [] } return record def prepare_measure(measure_name, measure_value): measure = { 'Name': measure_name, 'Value': str(measure_value), 'Type': 'DOUBLE' } return measure def write_records(records, common_attributes): try: result = write_client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME, CommonAttributes=common_attributes, Records=records) status = result['ResponseMetadata']['HTTPStatusCode'] print("Processed %d records. WriteRecords HTTPStatusCode: %s" % (len(records), status)) except Exception as err: print("Error:", err) if __name__ == '__main__': print("writing data to database {} table {}".format( DATABASE_NAME, TABLE_NAME)) session = boto3.Session() write_client = session.client('timestream-write', config=Config( read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10})) query_client = session.client('timestream-query') # Not used common_attributes = prepare_common_attributes() records = [] while True: current_time = int(time.time() * 1000) cpu_utilization = psutil.cpu_percent() memory_utilization = psutil.virtual_memory().percent swap_utilization = psutil.swap_memory().percent disk_utilization = psutil.disk_usage('/').percent record = prepare_record(current_time) record['MeasureValues'].append(prepare_measure('cpu', cpu_utilization)) record['MeasureValues'].append(prepare_measure('memory', memory_utilization)) record['MeasureValues'].append(prepare_measure('swap', swap_utilization)) record['MeasureValues'].append(prepare_measure('disk', disk_utilization)) records.append(record) print("records {} - cpu {} - memory {} - swap {} - disk {}".format( len(records), cpu_utilization, memory_utilization, swap_utilization, disk_utilization)) if len(records) == 100: write_records(records, common_attributes) records = [] time.sleep(INTERVAL)
Node.js

以下代码段使用适用于 JavaScript V2 的 Amazon SDK 风格。它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

async function writeRecords() { console.log("Writing records"); const currentTime = Date.now().toString(); // Unix time in milliseconds const dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ]; const record = { 'Dimensions': dimensions, 'MeasureName': 'metrics', 'MeasureValues': [ { 'Name': 'cpu_utilization', 'Value': '40', 'Type': 'DOUBLE', }, { 'Name': 'memory_utilization', 'Value': '13.5', 'Type': 'DOUBLE', }, ], 'MeasureValueType': 'MULTI', 'Time': currentTime.toString() } const records = [record]; const params = { DatabaseName: 'DatabaseName', TableName: 'TableName', Records: records }; const response = await writeClient.writeRecords(params); console.log(response); }
.NET
using System; using System.IO; using System.Collections.Generic; using Amazon.TimestreamWrite; using Amazon.TimestreamWrite.Model; using System.Threading.Tasks; namespace TimestreamDotNetSample { static class MultiMeasureValueConstants { public const string MultiMeasureValueSampleDb = "multiMeasureValueSampleDb"; public const string MultiMeasureValueSampleTable = "multiMeasureValueSampleTable"; } public class MultiValueAttributesExample { private readonly AmazonTimestreamWriteClient writeClient; public MultiValueAttributesExample(AmazonTimestreamWriteClient writeClient) { this.writeClient = writeClient; } public async Task WriteRecordsMultiMeasureValueSingleRecord() { Console.WriteLine("Writing records with multi value attributes"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var computationalRecord = new Record { MeasureName = "cpu_memory", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization}, MeasureValueType = "MULTI" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } public async Task WriteRecordsMultiMeasureValueMultipleRecords() { Console.WriteLine("Writing records with multi value attributes mixture type"); DateTimeOffset now = DateTimeOffset.UtcNow; string currentTimeString = (now.ToUnixTimeMilliseconds()).ToString(); List<Dimension> dimensions = new List<Dimension>{ new Dimension { Name = "region", Value = "us-east-1" }, new Dimension { Name = "az", Value = "az1" }, new Dimension { Name = "hostname", Value = "host1" } }; var commonAttributes = new Record { Dimensions = dimensions, Time = currentTimeString }; var cpuUtilization = new MeasureValue { Name = "cpu_utilization", Value = "13.6", Type = "DOUBLE" }; var memoryUtilization = new MeasureValue { Name = "memory_utilization", Value = "40", Type = "DOUBLE" }; var activeCores = new MeasureValue { Name = "active_cores", Value = "4", Type = "BIGINT" }; var computationalRecord = new Record { MeasureName = "computational_utilization", MeasureValues = new List<MeasureValue> {cpuUtilization, memoryUtilization, activeCores}, MeasureValueType = "MULTI" }; var aliveRecord = new Record { MeasureName = "is_healthy", MeasureValue = "true", MeasureValueType = "BOOLEAN" }; List<Record> records = new List<Record>(); records.Add(computationalRecord); records.Add(aliveRecord); try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = MultiMeasureValueConstants.MultiMeasureValueSampleDb, TableName = MultiMeasureValueConstants.MultiMeasureValueSampleTable, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); } } } }

处理写入失败

由于以下一个或多个原因,在 Amazon Timestream 中写入操作可能会失败:

  • 有些记录的时间戳超出了内存存储的保留期限。

  • 有些记录包含超过时间流定义限制的维度和/或度量。

  • 亚马逊 Timestream 已检测到重复的记录。如果存在多个具有相同维度、时间戳和度量名称的记录,但是:

    • 测量值不同。

    • 请求中不存在版本,或者新记录中版本的值等于或小于现有值。如果 Amazon Timestream 出于此原因拒绝数据,则中的ExistingVersion字段RejectedRecords将包含存储在亚马逊 Timestream 中的记录的当前版本。要强制更新,您可以重新发送请求,并将该记录的版本设置为大于。ExistingVersion

有关错误和被拒绝记录的更多信息,请参阅错误RejectedRecord

如果您的应用程序RejectedRecordsException在尝试将记录写入 Timestream 时收到,则可以解析被拒绝的记录以了解有关写入失败的更多信息,如下所示。

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
try { WriteRecordsResult writeRecordsResult = amazonTimestreamWrite.writeRecords(writeRecordsRequest); System.out.println("WriteRecords Status: " + writeRecordsResult.getSdkHttpMetadata().getHttpStatusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.getRejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.getRecordIndex() + ": " + rejectedRecord.getReason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
Java v2
try { WriteRecordsResponse writeRecordsResponse = timestreamWriteClient.writeRecords(writeRecordsRequest); System.out.println("writeRecordsWithCommonAttributes Status: " + writeRecordsResponse.sdkHttpResponse().statusCode()); } catch (RejectedRecordsException e) { System.out.println("RejectedRecords: " + e); for (RejectedRecord rejectedRecord : e.rejectedRecords()) { System.out.println("Rejected Index " + rejectedRecord.recordIndex() + ": " + rejectedRecord.reason()); } System.out.println("Other records were written successfully. "); } catch (Exception e) { System.out.println("Error: " + e); }
Go
_, err = writeSvc.WriteRecords(writeRecordsInput) if err != nil { fmt.Println("Error:") fmt.Println(err) } else { fmt.Println("Write records is successful") }
Python
try: result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME, Records=records, CommonAttributes=common_attributes) print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode']) except self.client.exceptions.RejectedRecordsException as err: print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) print("Other records were written successfully. ") except Exception as err: print("Error:", err)
Node.js

以下代码段使用适用于 JavaScript V2 的 Amazon SDK 风格。它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

await request.promise().then( (data) => { console.log("Write records successful"); }, (err) => { console.log("Error writing records:", err); if (err.code === 'RejectedRecordsException') { const responsePayload = JSON.parse(request.response.httpResponse.body.toString()); console.log("RejectedRecords: ", responsePayload.RejectedRecords); console.log("Other records were written successfully. "); } } );
.NET
try { var writeRecordsRequest = new WriteRecordsRequest { DatabaseName = Constants.DATABASE_NAME, TableName = Constants.TABLE_NAME, Records = records, CommonAttributes = commonAttributes }; WriteRecordsResponse response = await writeClient.WriteRecordsAsync(writeRecordsRequest); Console.WriteLine($"Write records status code: {response.HttpStatusCode.ToString()}"); } catch (RejectedRecordsException e) { Console.WriteLine("RejectedRecordsException:" + e.ToString()); foreach (RejectedRecord rr in e.RejectedRecords) { Console.WriteLine("RecordIndex " + rr.RecordIndex + " : " + rr.Reason); } Console.WriteLine("Other records were written successfully. "); } catch (Exception e) { Console.WriteLine("Write records failure:" + e.ToString()); }