Skip to content

Commit

Permalink
Add transaction write (#20)
Browse files Browse the repository at this point in the history
* faet: Add transact write foundation

* feat: TxWrite minimum example works

* feat: Add condition

* feat: Add error convertor

* spec: Add test
  • Loading branch information
bokuweb authored Jul 3, 2020
1 parent 24194a3 commit 78c39f9
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 4 deletions.
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dynamo:
node setup

test:
make dynamo
cargo test -- --test-threads=1

lint:
Expand Down
5 changes: 5 additions & 0 deletions raiden-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ pub fn derive_raiden(input: TokenStream) -> TokenStream {
let key_condition_builder =
key_condition::expand_key_condition_builder(&attr_enum_name, &struct_name);

let transact_write =
ops::expand_transact_write(&struct_name, &fields, rename_all_type, &table_name);

let expanded = quote! {

pub struct #client_name {
Expand Down Expand Up @@ -148,6 +151,8 @@ pub fn derive_raiden(input: TokenStream) -> TokenStream {

#delete_item

#transact_write

impl #client_name {
pub fn new(region: ::raiden::Region) -> Self {
let client = DynamoDbClient::new(region);
Expand Down
1 change: 1 addition & 0 deletions raiden-derive/src/ops/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ pub struct GetItemInput {
pub table_name: String,
}
*/

2 changes: 2 additions & 0 deletions raiden-derive/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ mod query;
mod update;
mod shared;
mod batch_get;
mod transact_write;

pub(crate) use delete::*;
pub(crate) use get::*;
pub(crate) use put::*;
pub(crate) use query::*;
pub(crate) use shared::*;
pub(crate) use batch_get::*;
pub(crate) use transact_write::*;
pub(crate) use update::*;
6 changes: 2 additions & 4 deletions raiden-derive/src/ops/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ pub(crate) fn expand_put_item(
impl #trait_name for #client_name {
fn put(&self, item: #item_input_name) -> #builder_name{
let mut input = ::raiden::PutItemInput::default();
let mut attribute_names: std::collections::HashMap<String, String> = std::collections::HashMap::new();
let mut attribute_values: std::collections::HashMap<String, raiden::AttributeValue> = std::collections::HashMap::new();
// let mut attribute_names: std::collections::HashMap<String, String> = std::collections::HashMap::new();
// let mut attribute_values: std::collections::HashMap<String, raiden::AttributeValue> = std::collections::HashMap::new();
let mut uuid_map: std::collections::HashMap<String, String> = std::collections::HashMap::new();

#input_items
Expand Down Expand Up @@ -166,8 +166,6 @@ pub(crate) fn expand_put_item(
if !attr_values.is_empty() {
self.input.expression_attribute_values = Some(attr_values);
}
dbg!(&cond_str);

self.input.condition_expression = Some(cond_str);
self
}
Expand Down
137 changes: 137 additions & 0 deletions raiden-derive/src/ops/transact_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::rename::*;
use quote::*;

pub(crate) fn expand_transact_write(
struct_name: &proc_macro2::Ident,
fields: &syn::FieldsNamed,
rename_all_type: crate::rename::RenameAllType,
table_name: &str,
) -> proc_macro2::TokenStream {
let item_input_name = format_ident!("{}PutItemInput", struct_name);
// let item_output_name = format_ident!("{}PutItemOutput", struct_name);
let put_builder = format_ident!("{}TransactPutItemBuilder", struct_name);
let condition_token_name = format_ident!("{}ConditionToken", struct_name);

// let output_values = fields.named.iter().map(|f| {
// let ident = &f.ident.clone().unwrap();
// let renamed = crate::finder::find_rename_value(&f.attrs);
// let attr_key = create_renamed(ident.to_string(), renamed, rename_all_type);
// if crate::finder::include_unary_attr(&f.attrs, "uuid") {
// quote! {
// #ident: uuid_map.get(#attr_key).cloned().unwrap().into(),
// }
// } else {
// quote! {
// #ident: item.#ident,
// }
// }
// });

let input_items = {
let insertion = fields.named.iter().map(|f| {
let ident = &f.ident.clone().unwrap();
let renamed = crate::finder::find_rename_value(&f.attrs);
let attr_key = create_renamed(ident.to_string(), renamed, rename_all_type);
if crate::finder::include_unary_attr(&f.attrs, "uuid") {
quote! {
let id = #struct_name::gen();
input_item.insert(
#attr_key.to_string(),
id.clone().into_attr(),
);
uuid_map.insert(
#attr_key.to_string(),
id,
);
}
} else {
quote! {
input_item.insert(
#attr_key.to_string(),
item.#ident.clone().into_attr(),
);
}
}
});

quote! {
let mut input_item: std::collections::HashMap<String, raiden::AttributeValue> = std::collections::HashMap::new();
#(#insertion)*
}
};

quote! {
impl #struct_name {
pub fn put(item: #item_input_name) -> #put_builder {
let mut input = ::raiden::Put::default();
let mut attribute_names: std::collections::HashMap<String, String> = std::collections::HashMap::new();
let mut attribute_values: std::collections::HashMap<String, raiden::AttributeValue> = std::collections::HashMap::new();
let mut uuid_map: std::collections::HashMap<String, String> = std::collections::HashMap::new();

#input_items

// let output_item = #item_output_name {
// #(#output_values)*
// };
input.item = input_item;
input.table_name = #table_name.to_owned();
#put_builder {
input,
// item: output_item,
}
}
}

pub struct #put_builder {
pub input: ::raiden::Put,
}

impl ::raiden::TransactWritePutBuilder for #put_builder {
fn build(self) -> ::raiden::Put {
self.input
}
}

impl #put_builder {
fn condition(mut self, cond: impl ::raiden::condition::ConditionBuilder<#condition_token_name>) -> Self {
let (cond_str, attr_names, attr_values) = cond.build();
if !attr_names.is_empty() {
self.input.expression_attribute_names = Some(attr_names);
}
if !attr_values.is_empty() {
self.input.expression_attribute_values = Some(attr_values);
}
self.input.condition_expression = Some(cond_str);
self
}
}
}
}

/*
#[derive(Default, Debug, Clone, PartialEq, Serialize)]
#[cfg_attr(feature = "deserialize_structs", derive(Deserialize))]
pub struct Put {
/// <p>A condition that must be satisfied in order for a conditional update to succeed.</p>
#[serde(rename = "ConditionExpression")]
#[serde(skip_serializing_if = "Option::is_none")]
pub condition_expression: Option<String>,
/// <p>One or more substitution tokens for attribute names in an expression.</p>
#[serde(rename = "ExpressionAttributeNames")]
#[serde(skip_serializing_if = "Option::is_none")]
pub expression_attribute_names: Option<::std::collections::HashMap<String, String>>,
/// <p>One or more values that can be substituted in an expression.</p>
#[serde(rename = "ExpressionAttributeValues")]
#[serde(skip_serializing_if = "Option::is_none")]
pub expression_attribute_values: Option<::std::collections::HashMap<String, AttributeValue>>,
/// <p>A map of attribute name to attribute values, representing the primary key of the item to be written by <code>PutItem</code>. All of the table's primary key attributes must be specified, and their data types must match those of the table's key schema. If any attributes are present in the item that are part of an index key schema for the table, their types must match the index key schema. </p>
#[serde(rename = "Item")]
pub item: ::std::collections::HashMap<String, AttributeValue>,
/// <p>Use <code>ReturnValuesOnConditionCheckFailure</code> to get the item attributes if the <code>Put</code> condition fails. For <code>ReturnValuesOnConditionCheckFailure</code>, the valid values are: NONE and ALL_OLD.</p>
#[serde(rename = "ReturnValuesOnConditionCheckFailure")]
#[serde(skip_serializing_if = "Option::is_none")]
pub return_values_on_condition_check_failure: Option<String>,
/// <p>Name of the table in which to write the item.</p>
#[serde(rename = "TableName")]
pub table_name: String,
}*/
36 changes: 36 additions & 0 deletions raiden/examples/transact_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use raiden::*;

#[derive(Raiden)]
#[raiden(table_name = "user")]
pub struct User {
#[raiden(partition_key)]
id: String,
name: String,
}

fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
async fn example() {
let mut tx = ::raiden::WriteTx::new(Region::Custom {
endpoint: "http://localhost:8000".into(),
name: "ap-northeast-1".into(),
});
let cond = User::condition().attr_not_exists(UserAttrNames::Id);
let input = User::put_item_builder()
.id("testId".to_owned())
.name("bokuweb".to_owned())
.build()
.unwrap();
let input2 = User::put_item_builder()
.id("testId2".to_owned())
.name("bokuweb".to_owned())
.build()
.unwrap();
tx.put(User::put(input).condition(cond))
.put(User::put(input2))
.run()
.await
.unwrap();
}
rt.block_on(example());
}
42 changes: 42 additions & 0 deletions raiden/src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ pub enum RaidenError {
ParseError(String),
#[error("unknown error")]
Unknown(crate::request::BufferedHttpResponse),
#[error("`{0}`")]
TransactionCanceled(String),
#[error("`{0}`")]
TransactionInProgress(String),
#[error("`{0}`")]
IdempotentParameterMismatch(String),
#[error("blocking error")]
Blocking,
#[error("next_token decode error")]
Expand Down Expand Up @@ -114,3 +120,39 @@ impl From<RusotoError<PutItemError>> for RaidenError {
}
}
}

impl From<RusotoError<TransactWriteItemsError>> for RaidenError {
fn from(error: RusotoError<TransactWriteItemsError>) -> Self {
match error {
RusotoError::Service(error) => match error {
TransactWriteItemsError::IdempotentParameterMismatch(msg) => {
RaidenError::IdempotentParameterMismatch(msg)
}
TransactWriteItemsError::InternalServerError(msg) => {
RaidenError::InternalServerError(msg)
}
TransactWriteItemsError::ProvisionedThroughputExceeded(msg) => {
RaidenError::ProvisionedThroughputExceeded(msg)
}
TransactWriteItemsError::RequestLimitExceeded(msg) => {
RaidenError::RequestLimitExceeded(msg)
}
TransactWriteItemsError::ResourceNotFound(msg) => {
RaidenError::ResourceNotFound(msg)
}
TransactWriteItemsError::TransactionCanceled(msg) => {
RaidenError::TransactionCanceled(msg)
}
TransactWriteItemsError::TransactionInProgress(msg) => {
RaidenError::TransactionInProgress(msg)
}
},
RusotoError::HttpDispatch(e) => RaidenError::HttpDispatch(e),
RusotoError::Credentials(e) => RaidenError::Credentials(e),
RusotoError::Validation(msg) => RaidenError::Validation(msg),
RusotoError::ParseError(msg) => RaidenError::ParseError(msg),
RusotoError::Unknown(res) => RaidenError::Unknown(res),
RusotoError::Blocking => RaidenError::Blocking,
}
}
}
3 changes: 3 additions & 0 deletions raiden/src/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod get;
pub mod put;
pub mod query;
pub mod transact_write;

pub use transact_write::*;
43 changes: 43 additions & 0 deletions raiden/src/ops/transact_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::{DynamoDb, TransactWriteItem};

pub struct WriteTx {
items: Vec<crate::TransactWriteItem>,
client: crate::DynamoDbClient,
}
impl WriteTx {
pub fn new(region: crate::Region) -> Self {
let client = crate::DynamoDbClient::new(region);
Self {
items: vec![],
client,
}
}

pub fn put(mut self, builder: impl TransactWritePutBuilder) -> Self {
self.items.push(TransactWriteItem {
condition_check: None,
delete: None,
update: None,
put: Some(builder.build()),
});
self
}

pub async fn run(self) -> Result<(), crate::RaidenError> {
let _res = self
.client
.transact_write_items(crate::TransactWriteItemsInput {
client_request_token: None,
return_consumed_capacity: None,
return_item_collection_metrics: None,
transact_items: self.items,
})
.await?;
// TODO: ADD Response later
Ok(())
}
}

pub trait TransactWritePutBuilder {
fn build(self) -> crate::Put;
}
43 changes: 43 additions & 0 deletions raiden/tests/all/transact_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#[cfg(test)]
mod tests {

#[cfg(test)]
use pretty_assertions::assert_eq;
use raiden::*;

#[derive(Raiden)]
#[raiden(table_name = "user")]
#[derive(Debug, Clone)]
pub struct User {
#[raiden(partition_key)]
id: String,
name: String,
}

#[test]
fn test_minimum_transact_write() {
let tx = ::raiden::WriteTx::new(Region::Custom {
endpoint: "http://localhost:8000".into(),
name: "ap-northeast-1".into(),
});
let cond = User::condition().attr_not_exists(UserAttrNames::Id);
let input = User::put_item_builder()
.id("testId".to_owned())
.name("bokuweb".to_owned())
.build()
.unwrap();
let input2 = User::put_item_builder()
.id("testId2".to_owned())
.name("bokuweb".to_owned())
.build()
.unwrap();
assert_eq(
tx.put(User::put(input).condition(cond))
.put(User::put(input2))
.run()
.await
.is_ok(),
true,
)
}
}

0 comments on commit 78c39f9

Please sign in to comment.