From 78c39f95421184f4493ed587c58da0422b381a54 Mon Sep 17 00:00:00 2001 From: bokuweb Date: Fri, 3 Jul 2020 12:35:09 +0900 Subject: [PATCH] Add transaction write (#20) * faet: Add transact write foundation * feat: TxWrite minimum example works * feat: Add condition * feat: Add error convertor * spec: Add test --- makefile | 1 + raiden-derive/src/lib.rs | 5 + raiden-derive/src/ops/get.rs | 1 + raiden-derive/src/ops/mod.rs | 2 + raiden-derive/src/ops/put.rs | 6 +- raiden-derive/src/ops/transact_write.rs | 137 ++++++++++++++++++++++++ raiden/examples/transact_write.rs | 36 +++++++ raiden/src/errors/mod.rs | 42 ++++++++ raiden/src/ops/mod.rs | 3 + raiden/src/ops/transact_write.rs | 43 ++++++++ raiden/tests/all/transact_write.rs | 43 ++++++++ 11 files changed, 315 insertions(+), 4 deletions(-) create mode 100644 raiden-derive/src/ops/transact_write.rs create mode 100644 raiden/examples/transact_write.rs create mode 100644 raiden/src/ops/transact_write.rs create mode 100644 raiden/tests/all/transact_write.rs diff --git a/makefile b/makefile index 1aa27cf8..55fb97d9 100644 --- a/makefile +++ b/makefile @@ -5,6 +5,7 @@ dynamo: node setup test: + make dynamo cargo test -- --test-threads=1 lint: diff --git a/raiden-derive/src/lib.rs b/raiden-derive/src/lib.rs index 8ab8ee9d..3969ce67 100644 --- a/raiden-derive/src/lib.rs +++ b/raiden-derive/src/lib.rs @@ -120,6 +120,9 @@ pub fn derive_raiden(input: TokenStream) -> TokenStream { let key_condition_builder = key_condition::expand_key_condition_builder(&attr_enum_name, &struct_name); + let transact_write = + ops::expand_transact_write(&struct_name, &fields, rename_all_type, &table_name); + let expanded = quote! { pub struct #client_name { @@ -148,6 +151,8 @@ pub fn derive_raiden(input: TokenStream) -> TokenStream { #delete_item + #transact_write + impl #client_name { pub fn new(region: ::raiden::Region) -> Self { let client = DynamoDbClient::new(region); diff --git a/raiden-derive/src/ops/get.rs b/raiden-derive/src/ops/get.rs index 8852cd39..70c04b58 100644 --- a/raiden-derive/src/ops/get.rs +++ b/raiden-derive/src/ops/get.rs @@ -89,3 +89,4 @@ pub struct GetItemInput { pub table_name: String, } */ + diff --git a/raiden-derive/src/ops/mod.rs b/raiden-derive/src/ops/mod.rs index 6820dd9e..b72e61a3 100644 --- a/raiden-derive/src/ops/mod.rs +++ b/raiden-derive/src/ops/mod.rs @@ -5,6 +5,7 @@ mod query; mod update; mod shared; mod batch_get; +mod transact_write; pub(crate) use delete::*; pub(crate) use get::*; @@ -12,4 +13,5 @@ pub(crate) use put::*; pub(crate) use query::*; pub(crate) use shared::*; pub(crate) use batch_get::*; +pub(crate) use transact_write::*; pub(crate) use update::*; \ No newline at end of file diff --git a/raiden-derive/src/ops/put.rs b/raiden-derive/src/ops/put.rs index 76050852..37ab8fdc 100644 --- a/raiden-derive/src/ops/put.rs +++ b/raiden-derive/src/ops/put.rs @@ -108,8 +108,8 @@ pub(crate) fn expand_put_item( impl #trait_name for #client_name { fn put(&self, item: #item_input_name) -> #builder_name{ let mut input = ::raiden::PutItemInput::default(); - let mut attribute_names: std::collections::HashMap = std::collections::HashMap::new(); - let mut attribute_values: std::collections::HashMap = std::collections::HashMap::new(); + // let mut attribute_names: std::collections::HashMap = std::collections::HashMap::new(); + // let mut attribute_values: std::collections::HashMap = std::collections::HashMap::new(); let mut uuid_map: std::collections::HashMap = std::collections::HashMap::new(); #input_items @@ -166,8 +166,6 @@ pub(crate) fn expand_put_item( if !attr_values.is_empty() { self.input.expression_attribute_values = Some(attr_values); } - dbg!(&cond_str); - self.input.condition_expression = Some(cond_str); self } diff --git a/raiden-derive/src/ops/transact_write.rs b/raiden-derive/src/ops/transact_write.rs new file mode 100644 index 00000000..42a9076c --- /dev/null +++ b/raiden-derive/src/ops/transact_write.rs @@ -0,0 +1,137 @@ +use crate::rename::*; +use quote::*; + +pub(crate) fn expand_transact_write( + struct_name: &proc_macro2::Ident, + fields: &syn::FieldsNamed, + rename_all_type: crate::rename::RenameAllType, + table_name: &str, +) -> proc_macro2::TokenStream { + let item_input_name = format_ident!("{}PutItemInput", struct_name); + // let item_output_name = format_ident!("{}PutItemOutput", struct_name); + let put_builder = format_ident!("{}TransactPutItemBuilder", struct_name); + let condition_token_name = format_ident!("{}ConditionToken", struct_name); + + // let output_values = fields.named.iter().map(|f| { + // let ident = &f.ident.clone().unwrap(); + // let renamed = crate::finder::find_rename_value(&f.attrs); + // let attr_key = create_renamed(ident.to_string(), renamed, rename_all_type); + // if crate::finder::include_unary_attr(&f.attrs, "uuid") { + // quote! { + // #ident: uuid_map.get(#attr_key).cloned().unwrap().into(), + // } + // } else { + // quote! { + // #ident: item.#ident, + // } + // } + // }); + + let input_items = { + let insertion = fields.named.iter().map(|f| { + let ident = &f.ident.clone().unwrap(); + let renamed = crate::finder::find_rename_value(&f.attrs); + let attr_key = create_renamed(ident.to_string(), renamed, rename_all_type); + if crate::finder::include_unary_attr(&f.attrs, "uuid") { + quote! { + let id = #struct_name::gen(); + input_item.insert( + #attr_key.to_string(), + id.clone().into_attr(), + ); + uuid_map.insert( + #attr_key.to_string(), + id, + ); + } + } else { + quote! { + input_item.insert( + #attr_key.to_string(), + item.#ident.clone().into_attr(), + ); + } + } + }); + + quote! { + let mut input_item: std::collections::HashMap = std::collections::HashMap::new(); + #(#insertion)* + } + }; + + quote! { + impl #struct_name { + pub fn put(item: #item_input_name) -> #put_builder { + let mut input = ::raiden::Put::default(); + let mut attribute_names: std::collections::HashMap = std::collections::HashMap::new(); + let mut attribute_values: std::collections::HashMap = std::collections::HashMap::new(); + let mut uuid_map: std::collections::HashMap = std::collections::HashMap::new(); + + #input_items + + // let output_item = #item_output_name { + // #(#output_values)* + // }; + input.item = input_item; + input.table_name = #table_name.to_owned(); + #put_builder { + input, + // item: output_item, + } + } + } + + pub struct #put_builder { + pub input: ::raiden::Put, + } + + impl ::raiden::TransactWritePutBuilder for #put_builder { + fn build(self) -> ::raiden::Put { + self.input + } + } + + impl #put_builder { + fn condition(mut self, cond: impl ::raiden::condition::ConditionBuilder<#condition_token_name>) -> Self { + let (cond_str, attr_names, attr_values) = cond.build(); + if !attr_names.is_empty() { + self.input.expression_attribute_names = Some(attr_names); + } + if !attr_values.is_empty() { + self.input.expression_attribute_values = Some(attr_values); + } + self.input.condition_expression = Some(cond_str); + self + } + } + } +} + +/* +#[derive(Default, Debug, Clone, PartialEq, Serialize)] +#[cfg_attr(feature = "deserialize_structs", derive(Deserialize))] +pub struct Put { + ///

A condition that must be satisfied in order for a conditional update to succeed.

+ #[serde(rename = "ConditionExpression")] + #[serde(skip_serializing_if = "Option::is_none")] + pub condition_expression: Option, + ///

One or more substitution tokens for attribute names in an expression.

+ #[serde(rename = "ExpressionAttributeNames")] + #[serde(skip_serializing_if = "Option::is_none")] + pub expression_attribute_names: Option<::std::collections::HashMap>, + ///

One or more values that can be substituted in an expression.

+ #[serde(rename = "ExpressionAttributeValues")] + #[serde(skip_serializing_if = "Option::is_none")] + pub expression_attribute_values: Option<::std::collections::HashMap>, + ///

A map of attribute name to attribute values, representing the primary key of the item to be written by PutItem. All of the table's primary key attributes must be specified, and their data types must match those of the table's key schema. If any attributes are present in the item that are part of an index key schema for the table, their types must match the index key schema.

+ #[serde(rename = "Item")] + pub item: ::std::collections::HashMap, + ///

Use ReturnValuesOnConditionCheckFailure to get the item attributes if the Put condition fails. For ReturnValuesOnConditionCheckFailure, the valid values are: NONE and ALL_OLD.

+ #[serde(rename = "ReturnValuesOnConditionCheckFailure")] + #[serde(skip_serializing_if = "Option::is_none")] + pub return_values_on_condition_check_failure: Option, + ///

Name of the table in which to write the item.

+ #[serde(rename = "TableName")] + pub table_name: String, +}*/ diff --git a/raiden/examples/transact_write.rs b/raiden/examples/transact_write.rs new file mode 100644 index 00000000..85dfdb22 --- /dev/null +++ b/raiden/examples/transact_write.rs @@ -0,0 +1,36 @@ +use raiden::*; + +#[derive(Raiden)] +#[raiden(table_name = "user")] +pub struct User { + #[raiden(partition_key)] + id: String, + name: String, +} + +fn main() { + let mut rt = tokio::runtime::Runtime::new().unwrap(); + async fn example() { + let mut tx = ::raiden::WriteTx::new(Region::Custom { + endpoint: "http://localhost:8000".into(), + name: "ap-northeast-1".into(), + }); + let cond = User::condition().attr_not_exists(UserAttrNames::Id); + let input = User::put_item_builder() + .id("testId".to_owned()) + .name("bokuweb".to_owned()) + .build() + .unwrap(); + let input2 = User::put_item_builder() + .id("testId2".to_owned()) + .name("bokuweb".to_owned()) + .build() + .unwrap(); + tx.put(User::put(input).condition(cond)) + .put(User::put(input2)) + .run() + .await + .unwrap(); + } + rt.block_on(example()); +} diff --git a/raiden/src/errors/mod.rs b/raiden/src/errors/mod.rs index 43b7a1e8..be73f142 100644 --- a/raiden/src/errors/mod.rs +++ b/raiden/src/errors/mod.rs @@ -35,6 +35,12 @@ pub enum RaidenError { ParseError(String), #[error("unknown error")] Unknown(crate::request::BufferedHttpResponse), + #[error("`{0}`")] + TransactionCanceled(String), + #[error("`{0}`")] + TransactionInProgress(String), + #[error("`{0}`")] + IdempotentParameterMismatch(String), #[error("blocking error")] Blocking, #[error("next_token decode error")] @@ -114,3 +120,39 @@ impl From> for RaidenError { } } } + +impl From> for RaidenError { + fn from(error: RusotoError) -> Self { + match error { + RusotoError::Service(error) => match error { + TransactWriteItemsError::IdempotentParameterMismatch(msg) => { + RaidenError::IdempotentParameterMismatch(msg) + } + TransactWriteItemsError::InternalServerError(msg) => { + RaidenError::InternalServerError(msg) + } + TransactWriteItemsError::ProvisionedThroughputExceeded(msg) => { + RaidenError::ProvisionedThroughputExceeded(msg) + } + TransactWriteItemsError::RequestLimitExceeded(msg) => { + RaidenError::RequestLimitExceeded(msg) + } + TransactWriteItemsError::ResourceNotFound(msg) => { + RaidenError::ResourceNotFound(msg) + } + TransactWriteItemsError::TransactionCanceled(msg) => { + RaidenError::TransactionCanceled(msg) + } + TransactWriteItemsError::TransactionInProgress(msg) => { + RaidenError::TransactionInProgress(msg) + } + }, + RusotoError::HttpDispatch(e) => RaidenError::HttpDispatch(e), + RusotoError::Credentials(e) => RaidenError::Credentials(e), + RusotoError::Validation(msg) => RaidenError::Validation(msg), + RusotoError::ParseError(msg) => RaidenError::ParseError(msg), + RusotoError::Unknown(res) => RaidenError::Unknown(res), + RusotoError::Blocking => RaidenError::Blocking, + } + } +} diff --git a/raiden/src/ops/mod.rs b/raiden/src/ops/mod.rs index 243b3640..85eead13 100644 --- a/raiden/src/ops/mod.rs +++ b/raiden/src/ops/mod.rs @@ -1,3 +1,6 @@ pub mod get; pub mod put; pub mod query; +pub mod transact_write; + +pub use transact_write::*; \ No newline at end of file diff --git a/raiden/src/ops/transact_write.rs b/raiden/src/ops/transact_write.rs new file mode 100644 index 00000000..8ef86593 --- /dev/null +++ b/raiden/src/ops/transact_write.rs @@ -0,0 +1,43 @@ +use crate::{DynamoDb, TransactWriteItem}; + +pub struct WriteTx { + items: Vec, + client: crate::DynamoDbClient, +} +impl WriteTx { + pub fn new(region: crate::Region) -> Self { + let client = crate::DynamoDbClient::new(region); + Self { + items: vec![], + client, + } + } + + pub fn put(mut self, builder: impl TransactWritePutBuilder) -> Self { + self.items.push(TransactWriteItem { + condition_check: None, + delete: None, + update: None, + put: Some(builder.build()), + }); + self + } + + pub async fn run(self) -> Result<(), crate::RaidenError> { + let _res = self + .client + .transact_write_items(crate::TransactWriteItemsInput { + client_request_token: None, + return_consumed_capacity: None, + return_item_collection_metrics: None, + transact_items: self.items, + }) + .await?; + // TODO: ADD Response later + Ok(()) + } +} + +pub trait TransactWritePutBuilder { + fn build(self) -> crate::Put; +} diff --git a/raiden/tests/all/transact_write.rs b/raiden/tests/all/transact_write.rs new file mode 100644 index 00000000..e8081274 --- /dev/null +++ b/raiden/tests/all/transact_write.rs @@ -0,0 +1,43 @@ +#[cfg(test)] +mod tests { + + #[cfg(test)] + use pretty_assertions::assert_eq; + use raiden::*; + + #[derive(Raiden)] + #[raiden(table_name = "user")] + #[derive(Debug, Clone)] + pub struct User { + #[raiden(partition_key)] + id: String, + name: String, + } + + #[test] + fn test_minimum_transact_write() { + let tx = ::raiden::WriteTx::new(Region::Custom { + endpoint: "http://localhost:8000".into(), + name: "ap-northeast-1".into(), + }); + let cond = User::condition().attr_not_exists(UserAttrNames::Id); + let input = User::put_item_builder() + .id("testId".to_owned()) + .name("bokuweb".to_owned()) + .build() + .unwrap(); + let input2 = User::put_item_builder() + .id("testId2".to_owned()) + .name("bokuweb".to_owned()) + .build() + .unwrap(); + assert_eq( + tx.put(User::put(input).condition(cond)) + .put(User::put(input2)) + .run() + .await + .is_ok(), + true, + ) + } +}