DynamoDB - 表活动
DynamoDB 流使您能够跟踪和响应表项更改。 利用此功能创建一个应用程序,该应用程序通过更新跨源的信息来响应更改。 同步大型多用户系统的数千个用户的数据。 使用它向用户发送更新通知。 事实证明,它的应用是多种多样且实质性的。 DynamoDB 流是实现此功能的主要工具。
流捕获包含表中项目修改的时间排序序列。 他们最多保留此数据 24 小时。 应用程序使用它们几乎实时地查看原始和修改的项目。
表上启用的流捕获所有修改。 在任何 CRUD 操作中,DynamoDB 都会使用已修改项目的主键属性创建流记录。 您可以配置流以获取其他信息,例如之前和之后的图像。
Streams 有两个保证 −
每条记录在流中出现一次
每个项目修改都会产生与修改顺序相同的流记录。
所有流都会实时处理,以便您可以将它们用于应用程序中的相关功能。
管理流
在创建表时,您可以启用流。 现有表允许禁用流或更改设置。 Streams提供异步操作的特性,这意味着不会影响表的性能。
利用 AWS 管理控制台进行简单的流管理。 首先,导航到控制台,然后选择表格。 在"概述"选项卡中,选择管理流。 在窗口内,选择添加到表数据修改流中的信息。 输入所有设置后,选择启用。
如果您想禁用任何现有流,请选择管理流,然后禁用。
您还可以利用 API CreateTable 和 UpdateTable 来启用或更改流。 使用参数 StreamSpecification 来配置流。 StreamEnabled 指定状态,true 表示启用,false 表示禁用。
StreamViewType 指定添加到流中的信息:KEYS_ONLY、NEW_IMAGE、OLD_IMAGE 和 NEW_AND_OLD_IMAGES。
流读取
通过连接到端点并发出 API 请求来读取和处理流。 每个流都由流记录组成,并且每个记录都作为拥有该流的单个修改而存在。 流记录包括揭示发布顺序的序列号。 记录属于也称为分片的组。 分片充当多个记录的容器,并且还保存访问和遍历记录所需的信息。 24小时后,记录自动删除。
这些碎片是根据需要生成和删除的,并且不会持续很长时间。 它们还会自动划分为多个新分片,通常是为了响应写入活动峰值。 在流禁用时,打开的分片关闭。 分片之间的层次关系意味着应用程序必须优先考虑父分片以获得正确的处理顺序。 您可以使用 Kinesis Adapter 自动执行此操作。
注意 − 没有变化的操作不会写入流记录。
访问和处理记录需要执行以下任务 −
- 确定目标流的 ARN。
- 确定保存目标记录的流的分片。
- 访问分片以检索所需的记录。
注意 − 一次最多应该有 2 个进程读取一个分片。 如果超过 2 个进程,则可以限制源。
可用的流 API 操作包括
- ListStreams
- DescribeStream
- GetShardIterator
- GetRecords
您可以查看以下流读取示例 −
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.Shard; import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.util.Tables; public class StreamsExample { private static AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); private static AmazonDynamoDBStreamsClient streamsClient = new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider()); public static void main(String args[]) { dynamoDBClient.setEndpoint("InsertDbEndpointHere"); streamsClient.setEndpoint("InsertStreamEndpointHere"); // table creation String tableName = "MyTestingTable"; ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition() .withAttributeName("ID") .withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement() .withAttributeName("ID") .withKeyType(KeyType.HASH)); //Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema) .withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification); System.out.println("Executing CreateTable for " + tableName); dynamoDBClient.createTable(createTableRequest); System.out.println("Creating " + tableName); try { Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); } catch (InterruptedException e) { e.printStackTrace(); } // Get the table's stream settings DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn); System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); System.out.println("Update view type: "+ myStreamSpec.getStreamViewType()); // Add an item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("ID", new AttributeValue().withN("222")); item.put("Alert", new AttributeValue().withS("item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("ID", new AttributeValue().withN("222")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Alert", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("modified item"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++; // Get stream shards DescribeStreamResult describeStreamResult = streamsClient.describeStream(new DescribeStreamRequest() .withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); // Process shards for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println("Processing " + shardId + " in "+ streamArn); // Get shard iterator GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(myStreamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Read data records with iterator GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest(). withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Pulling records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } } } }